async_net/
tcp.rs

1use std::fmt;
2use std::io::{self, IoSlice, Read as _, Write as _};
3use std::net::{Shutdown, SocketAddr};
4#[cfg(unix)]
5use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
6#[cfg(windows)]
7use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket, RawSocket};
8use std::panic::{RefUnwindSafe, UnwindSafe};
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12
13use async_io::Async;
14use futures_lite::{prelude::*, ready};
15
16use crate::addr::AsyncToSocketAddrs;
17
18/// A TCP server, listening for connections.
19///
20/// After creating a [`TcpListener`] by [`bind`][`TcpListener::bind()`]ing it to an address, it
21/// listens for incoming TCP connections. These can be accepted by calling
22/// [`accept()`][`TcpListener::accept()`] or by awaiting items from the stream of
23/// [`incoming`][`TcpListener::incoming()`] connections.
24///
25/// Cloning a [`TcpListener`] creates another handle to the same socket. The socket will be closed
26/// when all handles to it are dropped.
27///
28/// The Transmission Control Protocol is specified in [IETF RFC 793].
29///
30/// [IETF RFC 793]: https://tools.ietf.org/html/rfc793
31///
32/// # Examples
33///
34/// ```no_run
35/// use async_net::TcpListener;
36/// use futures_lite::prelude::*;
37///
38/// # futures_lite::future::block_on(async {
39/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
40/// let mut incoming = listener.incoming();
41///
42/// while let Some(stream) = incoming.next().await {
43///     let mut stream = stream?;
44///     stream.write_all(b"hello").await?;
45/// }
46/// # std::io::Result::Ok(()) });
47/// ```
48#[derive(Clone, Debug)]
49pub struct TcpListener {
50    inner: Arc<Async<std::net::TcpListener>>,
51}
52
53impl TcpListener {
54    fn new(inner: Arc<Async<std::net::TcpListener>>) -> TcpListener {
55        TcpListener { inner }
56    }
57
58    /// Creates a new [`TcpListener`] bound to the given address.
59    ///
60    /// Binding with a port number of 0 will request that the operating system assigns an available
61    /// port to this listener. The assigned port can be queried via the
62    /// [`local_addr()`][`TcpListener::local_addr()`] method.
63    ///
64    /// If `addr` yields multiple addresses, binding will be attempted with each of the addresses
65    /// until one succeeds and returns the listener. If none of the addresses succeed in creating a
66    /// listener, the error from the last attempt is returned.
67    ///
68    /// # Examples
69    ///
70    /// Create a TCP listener bound to `127.0.0.1:80`:
71    ///
72    /// ```no_run
73    /// use async_net::TcpListener;
74    ///
75    /// # futures_lite::future::block_on(async {
76    /// let listener = TcpListener::bind("127.0.0.1:80").await?;
77    /// # std::io::Result::Ok(()) });
78    /// ```
79    ///
80    /// Create a TCP listener bound to `127.0.0.1:80`. If that address is unavailable, then try
81    /// binding to `127.0.0.1:443`:
82    ///
83    /// ```no_run
84    /// use async_net::{SocketAddr, TcpListener};
85    ///
86    /// # futures_lite::future::block_on(async {
87    /// let addrs = [
88    ///     SocketAddr::from(([127, 0, 0, 1], 80)),
89    ///     SocketAddr::from(([127, 0, 0, 1], 443)),
90    /// ];
91    /// let listener = TcpListener::bind(&addrs[..]).await.unwrap();
92    /// # std::io::Result::Ok(()) });
93    pub async fn bind<A: AsyncToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
94        let mut last_err = None;
95
96        for addr in addr.to_socket_addrs().await? {
97            match Async::<std::net::TcpListener>::bind(addr) {
98                Ok(listener) => return Ok(TcpListener::new(Arc::new(listener))),
99                Err(err) => last_err = Some(err),
100            }
101        }
102
103        Err(last_err.unwrap_or_else(|| {
104            io::Error::new(
105                io::ErrorKind::InvalidInput,
106                "could not resolve to any of the addresses",
107            )
108        }))
109    }
110
111    /// Returns the local address this listener is bound to.
112    ///
113    /// # Examples
114    ///
115    /// Bind to port 0 and then see which port was assigned by the operating system:
116    ///
117    /// ```no_run
118    /// use async_net::{SocketAddr, TcpListener};
119    ///
120    /// # futures_lite::future::block_on(async {
121    /// let listener = TcpListener::bind("127.0.0.1:0").await?;
122    /// println!("Listening on {}", listener.local_addr()?);
123    /// # std::io::Result::Ok(()) });
124    pub fn local_addr(&self) -> io::Result<SocketAddr> {
125        self.inner.get_ref().local_addr()
126    }
127
128    /// Accepts a new incoming connection.
129    ///
130    /// Returns a TCP stream and the address it is connected to.
131    ///
132    /// # Examples
133    ///
134    /// ```no_run
135    /// use async_net::TcpListener;
136    ///
137    /// # futures_lite::future::block_on(async {
138    /// let listener = TcpListener::bind("127.0.0.1:8080").await?;
139    /// let (stream, addr) = listener.accept().await?;
140    /// # std::io::Result::Ok(()) });
141    /// ```
142    pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
143        let (stream, addr) = self.inner.accept().await?;
144        Ok((TcpStream::new(Arc::new(stream)), addr))
145    }
146
147    /// Returns a stream of incoming connections.
148    ///
149    /// Iterating over this stream is equivalent to calling [`accept()`][`TcpListener::accept()`]
150    /// in a loop. The stream of connections is infinite, i.e awaiting the next connection will
151    /// never result in [`None`].
152    ///
153    /// # Examples
154    ///
155    /// ```no_run
156    /// use async_net::TcpListener;
157    /// use futures_lite::prelude::*;
158    ///
159    /// # futures_lite::future::block_on(async {
160    /// let listener = TcpListener::bind("127.0.0.1:0").await?;
161    /// let mut incoming = listener.incoming();
162    ///
163    /// while let Some(stream) = incoming.next().await {
164    ///     let mut stream = stream?;
165    ///     stream.write_all(b"hello").await?;
166    /// }
167    /// # std::io::Result::Ok(()) });
168    /// ```
169    pub fn incoming(&self) -> Incoming<'_> {
170        Incoming {
171            incoming: Box::pin(self.inner.incoming()),
172        }
173    }
174
175    /// Gets the value of the `IP_TTL` option for this socket.
176    ///
177    /// This option configures the time-to-live field that is used in every packet sent from this
178    /// socket.
179    ///
180    /// # Examples
181    ///
182    /// ```no_run
183    /// use async_net::TcpListener;
184    ///
185    /// # futures_lite::future::block_on(async {
186    /// let listener = TcpListener::bind("127.0.0.1:80").await?;
187    /// listener.set_ttl(100)?;
188    /// assert_eq!(listener.ttl()?, 100);
189    /// # std::io::Result::Ok(()) });
190    /// ```
191    pub fn ttl(&self) -> io::Result<u32> {
192        self.inner.get_ref().ttl()
193    }
194
195    /// Sets the value of the `IP_TTL` option for this socket.
196    ///
197    /// This option configures the time-to-live field that is used in every packet sent from this
198    /// socket.
199    ///
200    /// # Examples
201    ///
202    /// ```no_run
203    /// use async_net::TcpListener;
204    ///
205    /// # futures_lite::future::block_on(async {
206    /// let listener = TcpListener::bind("127.0.0.1:80").await?;
207    /// listener.set_ttl(100)?;
208    /// # std::io::Result::Ok(()) });
209    /// ```
210    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
211        self.inner.get_ref().set_ttl(ttl)
212    }
213}
214
215impl From<Async<std::net::TcpListener>> for TcpListener {
216    fn from(listener: Async<std::net::TcpListener>) -> TcpListener {
217        TcpListener::new(Arc::new(listener))
218    }
219}
220
221impl TryFrom<std::net::TcpListener> for TcpListener {
222    type Error = io::Error;
223
224    fn try_from(listener: std::net::TcpListener) -> io::Result<TcpListener> {
225        Ok(TcpListener::new(Arc::new(Async::new(listener)?)))
226    }
227}
228
229impl From<TcpListener> for Arc<Async<std::net::TcpListener>> {
230    fn from(val: TcpListener) -> Self {
231        val.inner
232    }
233}
234
235#[cfg(unix)]
236impl AsRawFd for TcpListener {
237    fn as_raw_fd(&self) -> RawFd {
238        self.inner.as_raw_fd()
239    }
240}
241
242#[cfg(unix)]
243impl AsFd for TcpListener {
244    fn as_fd(&self) -> BorrowedFd<'_> {
245        self.inner.get_ref().as_fd()
246    }
247}
248
249#[cfg(unix)]
250impl TryFrom<OwnedFd> for TcpListener {
251    type Error = io::Error;
252
253    fn try_from(value: OwnedFd) -> Result<Self, Self::Error> {
254        Self::try_from(std::net::TcpListener::from(value))
255    }
256}
257
258#[cfg(windows)]
259impl AsRawSocket for TcpListener {
260    fn as_raw_socket(&self) -> RawSocket {
261        self.inner.as_raw_socket()
262    }
263}
264
265#[cfg(windows)]
266impl AsSocket for TcpListener {
267    fn as_socket(&self) -> BorrowedSocket<'_> {
268        self.inner.get_ref().as_socket()
269    }
270}
271
272#[cfg(windows)]
273impl TryFrom<OwnedSocket> for TcpListener {
274    type Error = io::Error;
275
276    fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> {
277        Self::try_from(std::net::TcpListener::from(value))
278    }
279}
280
281/// A stream of incoming TCP connections.
282///
283/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is
284/// created by the [`TcpListener::incoming()`] method.
285pub struct Incoming<'a> {
286    incoming:
287        Pin<Box<dyn Stream<Item = io::Result<Async<std::net::TcpStream>>> + Send + Sync + 'a>>,
288}
289
290impl Stream for Incoming<'_> {
291    type Item = io::Result<TcpStream>;
292
293    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
294        let res = ready!(Pin::new(&mut self.incoming).poll_next(cx));
295        Poll::Ready(res.map(|res| res.map(|stream| TcpStream::new(Arc::new(stream)))))
296    }
297}
298
299impl fmt::Debug for Incoming<'_> {
300    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
301        write!(f, "Incoming {{ ... }}")
302    }
303}
304
305/// A TCP connection.
306///
307/// A [`TcpStream`] can be created by [`connect`][`TcpStream::connect()`]ing to an endpoint or by
308/// [`accept`][`TcpListener::accept()`]ing an incoming connection.
309///
310/// [`TcpStream`] is a bidirectional stream that implements traits [`AsyncRead`] and
311/// [`AsyncWrite`].
312///
313/// Cloning a [`TcpStream`] creates another handle to the same socket. The socket will be closed
314/// when all handles to it are dropped. The reading and writing portions of the connection can also
315/// be shut down individually with the [`shutdown()`][`TcpStream::shutdown()`] method.
316///
317/// The Transmission Control Protocol is specified in [IETF RFC 793].
318///
319/// [IETF RFC 793]: https://tools.ietf.org/html/rfc793
320///
321/// # Examples
322///
323/// ```no_run
324/// use async_net::TcpStream;
325/// use futures_lite::prelude::*;
326///
327/// # futures_lite::future::block_on(async {
328/// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
329/// stream.write_all(b"hello").await?;
330///
331/// let mut buf = vec![0u8; 1024];
332/// let n = stream.read(&mut buf).await?;
333/// # std::io::Result::Ok(()) });
334/// ```
335pub struct TcpStream {
336    inner: Arc<Async<std::net::TcpStream>>,
337    readable: Option<async_io::ReadableOwned<std::net::TcpStream>>,
338    writable: Option<async_io::WritableOwned<std::net::TcpStream>>,
339}
340
341impl UnwindSafe for TcpStream {}
342impl RefUnwindSafe for TcpStream {}
343
344impl TcpStream {
345    fn new(inner: Arc<Async<std::net::TcpStream>>) -> TcpStream {
346        TcpStream {
347            inner,
348            readable: None,
349            writable: None,
350        }
351    }
352
353    /// Creates a TCP connection to the specified address.
354    ///
355    /// This method will create a new TCP socket and attempt to connect it to the provided `addr`,
356    ///
357    /// If `addr` yields multiple addresses, connecting will be attempted with each of the
358    /// addresses until connecting to one succeeds. If none of the addresses result in a successful
359    /// connection, the error from the last connect attempt is returned.
360    ///
361    /// # Examples
362    ///
363    /// Connect to `example.com:80`:
364    ///
365    /// ```
366    /// use async_net::TcpStream;
367    ///
368    /// # futures_lite::future::block_on(async {
369    /// let stream = TcpStream::connect("example.com:80").await?;
370    /// # std::io::Result::Ok(()) });
371    /// ```
372    ///
373    /// Connect to `127.0.0.1:8080`. If that fails, then try connecting to `127.0.0.1:8081`:
374    ///
375    /// ```no_run
376    /// use async_net::{SocketAddr, TcpStream};
377    ///
378    /// # futures_lite::future::block_on(async {
379    /// let addrs = [
380    ///     SocketAddr::from(([127, 0, 0, 1], 8080)),
381    ///     SocketAddr::from(([127, 0, 0, 1], 8081)),
382    /// ];
383    /// let stream = TcpStream::connect(&addrs[..]).await?;
384    /// # std::io::Result::Ok(()) });
385    /// ```
386    pub async fn connect<A: AsyncToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
387        let mut last_err = None;
388
389        for addr in addr.to_socket_addrs().await? {
390            match Async::<std::net::TcpStream>::connect(addr).await {
391                Ok(stream) => return Ok(TcpStream::new(Arc::new(stream))),
392                Err(e) => last_err = Some(e),
393            }
394        }
395
396        Err(last_err.unwrap_or_else(|| {
397            io::Error::new(
398                io::ErrorKind::InvalidInput,
399                "could not connect to any of the addresses",
400            )
401        }))
402    }
403
404    /// Returns the local address this stream is bound to.
405    ///
406    /// # Examples
407    ///
408    /// ```
409    /// use async_net::TcpStream;
410    ///
411    /// # futures_lite::future::block_on(async {
412    /// let stream = TcpStream::connect("example.com:80").await?;
413    /// println!("Local address is {}", stream.local_addr()?);
414    /// # std::io::Result::Ok(()) });
415    /// ```
416    pub fn local_addr(&self) -> io::Result<SocketAddr> {
417        self.inner.get_ref().local_addr()
418    }
419
420    /// Returns the remote address this stream is connected to.
421    ///
422    /// # Examples
423    ///
424    /// ```
425    /// use async_net::TcpStream;
426    ///
427    /// # futures_lite::future::block_on(async {
428    /// let stream = TcpStream::connect("example.com:80").await?;
429    /// println!("Connected to {}", stream.peer_addr()?);
430    /// # std::io::Result::Ok(()) });
431    /// ```
432    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
433        self.inner.get_ref().peer_addr()
434    }
435
436    /// Shuts down the read half, write half, or both halves of this connection.
437    ///
438    /// This method will cause all pending and future I/O in the given directions to return
439    /// immediately with an appropriate value (see the documentation of [`Shutdown`]).
440    ///
441    /// [`Shutdown`]: https://doc.rust-lang.org/std/net/enum.Shutdown.html
442    ///
443    /// # Examples
444    ///
445    /// ```no_run
446    /// use async_net::{Shutdown, TcpStream};
447    ///
448    /// # futures_lite::future::block_on(async {
449    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
450    /// stream.shutdown(Shutdown::Both)?;
451    /// # std::io::Result::Ok(()) });
452    /// ```
453    pub fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> {
454        self.inner.get_ref().shutdown(how)
455    }
456
457    /// Receives data without removing it from the queue.
458    ///
459    /// On success, returns the number of bytes peeked.
460    ///
461    /// Successive calls return the same data. This is accomplished by passing `MSG_PEEK` as a flag
462    /// to the underlying `recv` system call.
463    ///
464    /// # Examples
465    ///
466    /// ```no_run
467    /// use async_net::TcpStream;
468    ///
469    /// # futures_lite::future::block_on(async {
470    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
471    ///
472    /// let mut buf = vec![0; 1024];
473    /// let n = stream.peek(&mut buf).await?;
474    /// # std::io::Result::Ok(()) });
475    /// ```
476    pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
477        self.inner.peek(buf).await
478    }
479
480    /// Gets the value of the `TCP_NODELAY` option for this socket.
481    ///
482    /// If set to `true`, this option disables the [Nagle algorithm][nagle-wiki]. This means that
483    /// written data is always sent as soon as possible, even if there is only a small amount of
484    /// it.
485    ///
486    /// When set to `false`, written data is buffered until there is a certain amount to send out,
487    /// thereby avoiding the frequent sending of small packets.
488    ///
489    /// [nagle-wiki]: https://en.wikipedia.org/wiki/Nagle%27s_algorithm
490    ///
491    /// # Examples
492    ///
493    /// ```no_run
494    /// use async_net::TcpStream;
495    ///
496    /// # futures_lite::future::block_on(async {
497    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
498    /// println!("TCP_NODELAY is set to {}", stream.nodelay()?);
499    /// # std::io::Result::Ok(()) });
500    /// ```
501    pub fn nodelay(&self) -> io::Result<bool> {
502        self.inner.get_ref().nodelay()
503    }
504
505    /// Sets the value of the `TCP_NODELAY` option for this socket.
506    ///
507    /// If set to `true`, this option disables the [Nagle algorithm][nagle-wiki]. This means that
508    /// written data is always sent as soon as possible, even if there is only a small amount of
509    /// it.
510    ///
511    /// When set to `false`, written data is buffered until there is a certain amount to send out,
512    /// thereby avoiding the frequent sending of small packets.
513    ///
514    /// [nagle-wiki]: https://en.wikipedia.org/wiki/Nagle%27s_algorithm
515    ///
516    /// # Examples
517    ///
518    /// ```no_run
519    /// use async_net::TcpStream;
520    ///
521    /// # futures_lite::future::block_on(async {
522    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
523    /// stream.set_nodelay(false)?;
524    /// # std::io::Result::Ok(()) });
525    /// ```
526    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
527        self.inner.get_ref().set_nodelay(nodelay)
528    }
529
530    /// Gets the value of the `IP_TTL` option for this socket.
531    ///
532    /// This option configures the time-to-live field that is used in every packet sent from this
533    /// socket.
534    ///
535    /// # Examples
536    ///
537    /// ```no_run
538    /// use async_net::TcpStream;
539    ///
540    /// # futures_lite::future::block_on(async {
541    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
542    /// println!("IP_TTL is set to {}", stream.ttl()?);
543    /// # std::io::Result::Ok(()) });
544    /// ```
545    pub fn ttl(&self) -> io::Result<u32> {
546        self.inner.get_ref().ttl()
547    }
548
549    /// Sets the value of the `IP_TTL` option for this socket.
550    ///
551    /// This option configures the time-to-live field that is used in every packet sent from this
552    /// socket.
553    ///
554    /// # Examples
555    ///
556    /// ```no_run
557    /// use async_net::TcpStream;
558    ///
559    /// # futures_lite::future::block_on(async {
560    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
561    /// stream.set_ttl(100)?;
562    /// # std::io::Result::Ok(()) });
563    /// ```
564    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
565        self.inner.get_ref().set_ttl(ttl)
566    }
567}
568
569impl fmt::Debug for TcpStream {
570    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
571        self.inner.fmt(f)
572    }
573}
574
575impl Clone for TcpStream {
576    fn clone(&self) -> TcpStream {
577        TcpStream::new(self.inner.clone())
578    }
579}
580
581impl From<Async<std::net::TcpStream>> for TcpStream {
582    fn from(stream: Async<std::net::TcpStream>) -> TcpStream {
583        TcpStream::new(Arc::new(stream))
584    }
585}
586
587impl From<TcpStream> for Arc<Async<std::net::TcpStream>> {
588    fn from(val: TcpStream) -> Self {
589        val.inner
590    }
591}
592
593impl TryFrom<std::net::TcpStream> for TcpStream {
594    type Error = io::Error;
595
596    fn try_from(stream: std::net::TcpStream) -> io::Result<TcpStream> {
597        Ok(TcpStream::new(Arc::new(Async::new(stream)?)))
598    }
599}
600
601#[cfg(unix)]
602impl AsRawFd for TcpStream {
603    fn as_raw_fd(&self) -> RawFd {
604        self.inner.as_raw_fd()
605    }
606}
607
608#[cfg(unix)]
609impl AsFd for TcpStream {
610    fn as_fd(&self) -> BorrowedFd<'_> {
611        self.inner.get_ref().as_fd()
612    }
613}
614
615#[cfg(unix)]
616impl TryFrom<OwnedFd> for TcpStream {
617    type Error = io::Error;
618
619    fn try_from(value: OwnedFd) -> Result<Self, Self::Error> {
620        Self::try_from(std::net::TcpStream::from(value))
621    }
622}
623
624#[cfg(windows)]
625impl AsRawSocket for TcpStream {
626    fn as_raw_socket(&self) -> RawSocket {
627        self.inner.as_raw_socket()
628    }
629}
630
631#[cfg(windows)]
632impl AsSocket for TcpStream {
633    fn as_socket(&self) -> BorrowedSocket<'_> {
634        self.inner.get_ref().as_socket()
635    }
636}
637
638#[cfg(windows)]
639impl TryFrom<OwnedSocket> for TcpStream {
640    type Error = io::Error;
641
642    fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> {
643        Self::try_from(std::net::TcpStream::from(value))
644    }
645}
646
647impl AsyncRead for TcpStream {
648    fn poll_read(
649        mut self: Pin<&mut Self>,
650        cx: &mut Context<'_>,
651        buf: &mut [u8],
652    ) -> Poll<io::Result<usize>> {
653        loop {
654            // Attempt the non-blocking operation.
655            match self.inner.get_ref().read(buf) {
656                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
657                res => {
658                    self.readable = None;
659                    return Poll::Ready(res);
660                }
661            }
662
663            // Initialize the future to wait for readiness.
664            if self.readable.is_none() {
665                self.readable = Some(self.inner.clone().readable_owned());
666            }
667
668            // Poll the future for readiness.
669            if let Some(f) = &mut self.readable {
670                let res = ready!(Pin::new(f).poll(cx));
671                self.readable = None;
672                res?;
673            }
674        }
675    }
676}
677
678impl AsyncWrite for TcpStream {
679    fn poll_write(
680        mut self: Pin<&mut Self>,
681        cx: &mut Context<'_>,
682        buf: &[u8],
683    ) -> Poll<io::Result<usize>> {
684        loop {
685            // Attempt the non-blocking operation.
686            match self.inner.get_ref().write(buf) {
687                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
688                res => {
689                    self.writable = None;
690                    return Poll::Ready(res);
691                }
692            }
693
694            // Initialize the future to wait for readiness.
695            if self.writable.is_none() {
696                self.writable = Some(self.inner.clone().writable_owned());
697            }
698
699            // Poll the future for readiness.
700            if let Some(f) = &mut self.writable {
701                let res = ready!(Pin::new(f).poll(cx));
702                self.writable = None;
703                res?;
704            }
705        }
706    }
707
708    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
709        loop {
710            // Attempt the non-blocking operation.
711            match self.inner.get_ref().flush() {
712                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
713                res => {
714                    self.writable = None;
715                    return Poll::Ready(res);
716                }
717            }
718
719            // Initialize the future to wait for readiness.
720            if self.writable.is_none() {
721                self.writable = Some(self.inner.clone().writable_owned());
722            }
723
724            // Poll the future for readiness.
725            if let Some(f) = &mut self.writable {
726                let res = ready!(Pin::new(f).poll(cx));
727                self.writable = None;
728                res?;
729            }
730        }
731    }
732
733    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
734        Poll::Ready(self.inner.get_ref().shutdown(Shutdown::Write))
735    }
736
737    fn poll_write_vectored(
738        mut self: Pin<&mut Self>,
739        cx: &mut Context<'_>,
740        bufs: &[IoSlice<'_>],
741    ) -> Poll<io::Result<usize>> {
742        loop {
743            // Attempt the non-blocking operation.
744            match self.inner.get_ref().write_vectored(bufs) {
745                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
746                res => {
747                    self.writable = None;
748                    return Poll::Ready(res);
749                }
750            }
751
752            // Initialize the future to wait for readiness.
753            if self.writable.is_none() {
754                self.writable = Some(self.inner.clone().writable_owned());
755            }
756
757            // Poll the future for readiness.
758            if let Some(f) = &mut self.writable {
759                let res = ready!(Pin::new(f).poll(cx));
760                self.writable = None;
761                res?;
762            }
763        }
764    }
765}