async_net_mini/
tcp.rs

1use std::fmt;
2use std::io::{self, IoSlice, Read as _, Write as _};
3use std::net::{Shutdown, SocketAddr};
4#[cfg(unix)]
5use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
6#[cfg(windows)]
7use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket, RawSocket};
8use std::panic::{RefUnwindSafe, UnwindSafe};
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12
13use async_io_mini::Async;
14use futures_lite::{prelude::*, ready};
15
16use crate::util::{ReadableOwned, WritableOwned};
17
18/// A TCP server, listening for connections.
19///
20/// After creating a [`TcpListener`] by [`bind`][`TcpListener::bind()`]ing it to an address, it
21/// listens for incoming TCP connections. These can be accepted by calling
22/// [`accept()`][`TcpListener::accept()`] or by awaiting items from the stream of
23/// [`incoming`][`TcpListener::incoming()`] connections.
24///
25/// Cloning a [`TcpListener`] creates another handle to the same socket. The socket will be closed
26/// when all handles to it are dropped.
27///
28/// The Transmission Control Protocol is specified in [IETF RFC 793].
29///
30/// [IETF RFC 793]: https://tools.ietf.org/html/rfc793
31///
32/// # Examples
33///
34/// ```no_run
35/// use async_net::TcpListener;
36/// use futures_lite::prelude::*;
37///
38/// # futures_lite::future::block_on(async {
39/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
40/// let mut incoming = listener.incoming();
41///
42/// while let Some(stream) = incoming.next().await {
43///     let mut stream = stream?;
44///     stream.write_all(b"hello").await?;
45/// }
46/// # std::io::Result::Ok(()) });
47/// ```
48#[derive(Clone, Debug)]
49pub struct TcpListener {
50    inner: Arc<Async<std::net::TcpListener>>,
51}
52
53impl TcpListener {
54    fn new(inner: Arc<Async<std::net::TcpListener>>) -> TcpListener {
55        TcpListener { inner }
56    }
57
58    /// Creates a new [`TcpListener`] bound to the given address.
59    ///
60    /// Binding with a port number of 0 will request that the operating system assigns an available
61    /// port to this listener. The assigned port can be queried via the
62    /// [`local_addr()`][`TcpListener::local_addr()`] method.
63    ///
64    /// If `addr` yields multiple addresses, binding will be attempted with each of the addresses
65    /// until one succeeds and returns the listener. If none of the addresses succeed in creating a
66    /// listener, the error from the last attempt is returned.
67    ///
68    /// # Examples
69    ///
70    /// Create a TCP listener bound to `127.0.0.1:80`:
71    ///
72    /// ```no_run
73    /// use async_net::TcpListener;
74    ///
75    /// # futures_lite::future::block_on(async {
76    /// let listener = TcpListener::bind("127.0.0.1:80").await?;
77    /// # std::io::Result::Ok(()) });
78    /// ```
79    ///
80    /// Create a TCP listener bound to `127.0.0.1:80`. If that address is unavailable, then try
81    /// binding to `127.0.0.1:443`:
82    ///
83    /// ```no_run
84    /// use async_net::{SocketAddr, TcpListener};
85    ///
86    /// # futures_lite::future::block_on(async {
87    /// let addrs = [
88    ///     SocketAddr::from(([127, 0, 0, 1], 80)),
89    ///     SocketAddr::from(([127, 0, 0, 1], 443)),
90    /// ];
91    /// let listener = TcpListener::bind(&addrs[..]).await.unwrap();
92    /// # std::io::Result::Ok(()) });
93    pub async fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
94        Ok(TcpListener::new(Arc::new(
95            Async::<std::net::TcpListener>::bind(addr)?,
96        )))
97    }
98
99    /// Returns the local address this listener is bound to.
100    ///
101    /// # Examples
102    ///
103    /// Bind to port 0 and then see which port was assigned by the operating system:
104    ///
105    /// ```no_run
106    /// use async_net::{SocketAddr, TcpListener};
107    ///
108    /// # futures_lite::future::block_on(async {
109    /// let listener = TcpListener::bind("127.0.0.1:0").await?;
110    /// println!("Listening on {}", listener.local_addr()?);
111    /// # std::io::Result::Ok(()) });
112    pub fn local_addr(&self) -> io::Result<SocketAddr> {
113        self.inner.get_ref().local_addr()
114    }
115
116    /// Accepts a new incoming connection.
117    ///
118    /// Returns a TCP stream and the address it is connected to.
119    ///
120    /// # Examples
121    ///
122    /// ```no_run
123    /// use async_net::TcpListener;
124    ///
125    /// # futures_lite::future::block_on(async {
126    /// let listener = TcpListener::bind("127.0.0.1:8080").await?;
127    /// let (stream, addr) = listener.accept().await?;
128    /// # std::io::Result::Ok(()) });
129    /// ```
130    pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
131        let (stream, addr) = self.inner.accept().await?;
132        Ok((TcpStream::new(Arc::new(stream)), addr))
133    }
134
135    /// Returns a stream of incoming connections.
136    ///
137    /// Iterating over this stream is equivalent to calling [`accept()`][`TcpListener::accept()`]
138    /// in a loop. The stream of connections is infinite, i.e awaiting the next connection will
139    /// never result in [`None`].
140    ///
141    /// # Examples
142    ///
143    /// ```no_run
144    /// use async_net::TcpListener;
145    /// use futures_lite::prelude::*;
146    ///
147    /// # futures_lite::future::block_on(async {
148    /// let listener = TcpListener::bind("127.0.0.1:0").await?;
149    /// let mut incoming = listener.incoming();
150    ///
151    /// while let Some(stream) = incoming.next().await {
152    ///     let mut stream = stream?;
153    ///     stream.write_all(b"hello").await?;
154    /// }
155    /// # std::io::Result::Ok(()) });
156    /// ```
157    pub fn incoming(&self) -> Incoming<'_> {
158        Incoming {
159            incoming: Box::pin(self.inner.incoming()),
160        }
161    }
162
163    /// Gets the value of the `IP_TTL` option for this socket.
164    ///
165    /// This option configures the time-to-live field that is used in every packet sent from this
166    /// socket.
167    ///
168    /// # Examples
169    ///
170    /// ```no_run
171    /// use async_net::TcpListener;
172    ///
173    /// # futures_lite::future::block_on(async {
174    /// let listener = TcpListener::bind("127.0.0.1:80").await?;
175    /// listener.set_ttl(100)?;
176    /// assert_eq!(listener.ttl()?, 100);
177    /// # std::io::Result::Ok(()) });
178    /// ```
179    pub fn ttl(&self) -> io::Result<u32> {
180        self.inner.get_ref().ttl()
181    }
182
183    /// Sets the value of the `IP_TTL` option for this socket.
184    ///
185    /// This option configures the time-to-live field that is used in every packet sent from this
186    /// socket.
187    ///
188    /// # Examples
189    ///
190    /// ```no_run
191    /// use async_net::TcpListener;
192    ///
193    /// # futures_lite::future::block_on(async {
194    /// let listener = TcpListener::bind("127.0.0.1:80").await?;
195    /// listener.set_ttl(100)?;
196    /// # std::io::Result::Ok(()) });
197    /// ```
198    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
199        self.inner.get_ref().set_ttl(ttl)
200    }
201}
202
203impl From<Async<std::net::TcpListener>> for TcpListener {
204    fn from(listener: Async<std::net::TcpListener>) -> TcpListener {
205        TcpListener::new(Arc::new(listener))
206    }
207}
208
209impl TryFrom<std::net::TcpListener> for TcpListener {
210    type Error = io::Error;
211
212    fn try_from(listener: std::net::TcpListener) -> io::Result<TcpListener> {
213        Ok(TcpListener::new(Arc::new(Async::new(listener)?)))
214    }
215}
216
217impl From<TcpListener> for Arc<Async<std::net::TcpListener>> {
218    fn from(val: TcpListener) -> Self {
219        val.inner
220    }
221}
222
223#[cfg(unix)]
224impl AsRawFd for TcpListener {
225    fn as_raw_fd(&self) -> RawFd {
226        self.inner.as_raw_fd()
227    }
228}
229
230#[cfg(unix)]
231impl AsFd for TcpListener {
232    fn as_fd(&self) -> BorrowedFd<'_> {
233        self.inner.get_ref().as_fd()
234    }
235}
236
237#[cfg(unix)]
238impl TryFrom<OwnedFd> for TcpListener {
239    type Error = io::Error;
240
241    fn try_from(value: OwnedFd) -> Result<Self, Self::Error> {
242        Self::try_from(std::net::TcpListener::from(value))
243    }
244}
245
246#[cfg(windows)]
247impl AsRawSocket for TcpListener {
248    fn as_raw_socket(&self) -> RawSocket {
249        self.inner.as_raw_socket()
250    }
251}
252
253#[cfg(windows)]
254impl AsSocket for TcpListener {
255    fn as_socket(&self) -> BorrowedSocket<'_> {
256        self.inner.get_ref().as_socket()
257    }
258}
259
260#[cfg(windows)]
261impl TryFrom<OwnedSocket> for TcpListener {
262    type Error = io::Error;
263
264    fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> {
265        Self::try_from(std::net::TcpListener::from(value))
266    }
267}
268
269/// A stream of incoming TCP connections.
270///
271/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is
272/// created by the [`TcpListener::incoming()`] method.
273pub struct Incoming<'a> {
274    incoming:
275        Pin<Box<dyn Stream<Item = io::Result<Async<std::net::TcpStream>>> + Send + Sync + 'a>>,
276}
277
278impl Stream for Incoming<'_> {
279    type Item = io::Result<TcpStream>;
280
281    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
282        let res = ready!(Pin::new(&mut self.incoming).poll_next(cx));
283        Poll::Ready(res.map(|res| res.map(|stream| TcpStream::new(Arc::new(stream)))))
284    }
285}
286
287impl fmt::Debug for Incoming<'_> {
288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289        write!(f, "Incoming {{ ... }}")
290    }
291}
292
293/// A TCP connection.
294///
295/// A [`TcpStream`] can be created by [`connect`][`TcpStream::connect()`]ing to an endpoint or by
296/// [`accept`][`TcpListener::accept()`]ing an incoming connection.
297///
298/// [`TcpStream`] is a bidirectional stream that implements traits [`AsyncRead`] and
299/// [`AsyncWrite`].
300///
301/// Cloning a [`TcpStream`] creates another handle to the same socket. The socket will be closed
302/// when all handles to it are dropped. The reading and writing portions of the connection can also
303/// be shut down individually with the [`shutdown()`][`TcpStream::shutdown()`] method.
304///
305/// The Transmission Control Protocol is specified in [IETF RFC 793].
306///
307/// [IETF RFC 793]: https://tools.ietf.org/html/rfc793
308///
309/// # Examples
310///
311/// ```no_run
312/// use async_net::TcpStream;
313/// use futures_lite::prelude::*;
314///
315/// # futures_lite::future::block_on(async {
316/// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
317/// stream.write_all(b"hello").await?;
318///
319/// let mut buf = vec![0u8; 1024];
320/// let n = stream.read(&mut buf).await?;
321/// # std::io::Result::Ok(()) });
322/// ```
323pub struct TcpStream {
324    inner: Arc<Async<std::net::TcpStream>>,
325    readable: Option<ReadableOwned<std::net::TcpStream>>,
326    writable: Option<WritableOwned<std::net::TcpStream>>,
327}
328
329impl UnwindSafe for TcpStream {}
330impl RefUnwindSafe for TcpStream {}
331
332impl TcpStream {
333    fn new(inner: Arc<Async<std::net::TcpStream>>) -> TcpStream {
334        TcpStream {
335            inner,
336            readable: None,
337            writable: None,
338        }
339    }
340
341    /// Creates a TCP connection to the specified address.
342    ///
343    /// This method will create a new TCP socket and attempt to connect it to the provided `addr`,
344    ///
345    /// If `addr` yields multiple addresses, connecting will be attempted with each of the
346    /// addresses until connecting to one succeeds. If none of the addresses result in a successful
347    /// connection, the error from the last connect attempt is returned.
348    ///
349    /// # Examples
350    ///
351    /// Connect to `example.com:80`:
352    ///
353    /// ```
354    /// use async_net::TcpStream;
355    ///
356    /// # futures_lite::future::block_on(async {
357    /// let stream = TcpStream::connect("example.com:80").await?;
358    /// # std::io::Result::Ok(()) });
359    /// ```
360    ///
361    /// Connect to `127.0.0.1:8080`. If that fails, then try connecting to `127.0.0.1:8081`:
362    ///
363    /// ```no_run
364    /// use async_net::{SocketAddr, TcpStream};
365    ///
366    /// # futures_lite::future::block_on(async {
367    /// let addrs = [
368    ///     SocketAddr::from(([127, 0, 0, 1], 8080)),
369    ///     SocketAddr::from(([127, 0, 0, 1], 8081)),
370    /// ];
371    /// let stream = TcpStream::connect(&addrs[..]).await?;
372    /// # std::io::Result::Ok(()) });
373    /// ```
374    pub async fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
375        Ok(TcpStream::new(Arc::new(
376            Async::<std::net::TcpStream>::connect(addr).await?,
377        )))
378    }
379
380    /// Returns the local address this stream is bound to.
381    ///
382    /// # Examples
383    ///
384    /// ```
385    /// use async_net::TcpStream;
386    ///
387    /// # futures_lite::future::block_on(async {
388    /// let stream = TcpStream::connect("example.com:80").await?;
389    /// println!("Local address is {}", stream.local_addr()?);
390    /// # std::io::Result::Ok(()) });
391    /// ```
392    pub fn local_addr(&self) -> io::Result<SocketAddr> {
393        self.inner.get_ref().local_addr()
394    }
395
396    /// Returns the remote address this stream is connected to.
397    ///
398    /// # Examples
399    ///
400    /// ```
401    /// use async_net::TcpStream;
402    ///
403    /// # futures_lite::future::block_on(async {
404    /// let stream = TcpStream::connect("example.com:80").await?;
405    /// println!("Connected to {}", stream.peer_addr()?);
406    /// # std::io::Result::Ok(()) });
407    /// ```
408    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
409        self.inner.get_ref().peer_addr()
410    }
411
412    /// Shuts down the read half, write half, or both halves of this connection.
413    ///
414    /// This method will cause all pending and future I/O in the given directions to return
415    /// immediately with an appropriate value (see the documentation of [`Shutdown`]).
416    ///
417    /// [`Shutdown`]: https://doc.rust-lang.org/std/net/enum.Shutdown.html
418    ///
419    /// # Examples
420    ///
421    /// ```no_run
422    /// use async_net::{Shutdown, TcpStream};
423    ///
424    /// # futures_lite::future::block_on(async {
425    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
426    /// stream.shutdown(Shutdown::Both)?;
427    /// # std::io::Result::Ok(()) });
428    /// ```
429    pub fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> {
430        self.inner.get_ref().shutdown(how)
431    }
432
433    /// Receives data without removing it from the queue.
434    ///
435    /// On success, returns the number of bytes peeked.
436    ///
437    /// Successive calls return the same data. This is accomplished by passing `MSG_PEEK` as a flag
438    /// to the underlying `recv` system call.
439    ///
440    /// # Examples
441    ///
442    /// ```no_run
443    /// use async_net::TcpStream;
444    ///
445    /// # futures_lite::future::block_on(async {
446    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
447    ///
448    /// let mut buf = vec![0; 1024];
449    /// let n = stream.peek(&mut buf).await?;
450    /// # std::io::Result::Ok(()) });
451    /// ```
452    pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
453        self.inner.peek(buf).await
454    }
455
456    /// Gets the value of the `TCP_NODELAY` option for this socket.
457    ///
458    /// If set to `true`, this option disables the [Nagle algorithm][nagle-wiki]. This means that
459    /// written data is always sent as soon as possible, even if there is only a small amount of
460    /// it.
461    ///
462    /// When set to `false`, written data is buffered until there is a certain amount to send out,
463    /// thereby avoiding the frequent sending of small packets.
464    ///
465    /// [nagle-wiki]: https://en.wikipedia.org/wiki/Nagle%27s_algorithm
466    ///
467    /// # Examples
468    ///
469    /// ```no_run
470    /// use async_net::TcpStream;
471    ///
472    /// # futures_lite::future::block_on(async {
473    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
474    /// println!("TCP_NODELAY is set to {}", stream.nodelay()?);
475    /// # std::io::Result::Ok(()) });
476    /// ```
477    pub fn nodelay(&self) -> io::Result<bool> {
478        self.inner.get_ref().nodelay()
479    }
480
481    /// Sets the value of the `TCP_NODELAY` option for this socket.
482    ///
483    /// If set to `true`, this option disables the [Nagle algorithm][nagle-wiki]. This means that
484    /// written data is always sent as soon as possible, even if there is only a small amount of
485    /// it.
486    ///
487    /// When set to `false`, written data is buffered until there is a certain amount to send out,
488    /// thereby avoiding the frequent sending of small packets.
489    ///
490    /// [nagle-wiki]: https://en.wikipedia.org/wiki/Nagle%27s_algorithm
491    ///
492    /// # Examples
493    ///
494    /// ```no_run
495    /// use async_net::TcpStream;
496    ///
497    /// # futures_lite::future::block_on(async {
498    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
499    /// stream.set_nodelay(false)?;
500    /// # std::io::Result::Ok(()) });
501    /// ```
502    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
503        self.inner.get_ref().set_nodelay(nodelay)
504    }
505
506    /// Gets the value of the `IP_TTL` option for this socket.
507    ///
508    /// This option configures the time-to-live field that is used in every packet sent from this
509    /// socket.
510    ///
511    /// # Examples
512    ///
513    /// ```no_run
514    /// use async_net::TcpStream;
515    ///
516    /// # futures_lite::future::block_on(async {
517    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
518    /// println!("IP_TTL is set to {}", stream.ttl()?);
519    /// # std::io::Result::Ok(()) });
520    /// ```
521    pub fn ttl(&self) -> io::Result<u32> {
522        self.inner.get_ref().ttl()
523    }
524
525    /// Sets the value of the `IP_TTL` option for this socket.
526    ///
527    /// This option configures the time-to-live field that is used in every packet sent from this
528    /// socket.
529    ///
530    /// # Examples
531    ///
532    /// ```no_run
533    /// use async_net::TcpStream;
534    ///
535    /// # futures_lite::future::block_on(async {
536    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
537    /// stream.set_ttl(100)?;
538    /// # std::io::Result::Ok(()) });
539    /// ```
540    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
541        self.inner.get_ref().set_ttl(ttl)
542    }
543}
544
545impl fmt::Debug for TcpStream {
546    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
547        self.inner.fmt(f)
548    }
549}
550
551impl Clone for TcpStream {
552    fn clone(&self) -> TcpStream {
553        TcpStream::new(self.inner.clone())
554    }
555}
556
557impl From<Async<std::net::TcpStream>> for TcpStream {
558    fn from(stream: Async<std::net::TcpStream>) -> TcpStream {
559        TcpStream::new(Arc::new(stream))
560    }
561}
562
563impl From<TcpStream> for Arc<Async<std::net::TcpStream>> {
564    fn from(val: TcpStream) -> Self {
565        val.inner
566    }
567}
568
569impl TryFrom<std::net::TcpStream> for TcpStream {
570    type Error = io::Error;
571
572    fn try_from(stream: std::net::TcpStream) -> io::Result<TcpStream> {
573        Ok(TcpStream::new(Arc::new(Async::new(stream)?)))
574    }
575}
576
577#[cfg(unix)]
578impl AsRawFd for TcpStream {
579    fn as_raw_fd(&self) -> RawFd {
580        self.inner.as_raw_fd()
581    }
582}
583
584#[cfg(unix)]
585impl AsFd for TcpStream {
586    fn as_fd(&self) -> BorrowedFd<'_> {
587        self.inner.get_ref().as_fd()
588    }
589}
590
591#[cfg(unix)]
592impl TryFrom<OwnedFd> for TcpStream {
593    type Error = io::Error;
594
595    fn try_from(value: OwnedFd) -> Result<Self, Self::Error> {
596        Self::try_from(std::net::TcpStream::from(value))
597    }
598}
599
600#[cfg(windows)]
601impl AsRawSocket for TcpStream {
602    fn as_raw_socket(&self) -> RawSocket {
603        self.inner.as_raw_socket()
604    }
605}
606
607#[cfg(windows)]
608impl AsSocket for TcpStream {
609    fn as_socket(&self) -> BorrowedSocket<'_> {
610        self.inner.get_ref().as_socket()
611    }
612}
613
614#[cfg(windows)]
615impl TryFrom<OwnedSocket> for TcpStream {
616    type Error = io::Error;
617
618    fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> {
619        Self::try_from(std::net::TcpStream::from(value))
620    }
621}
622
623impl AsyncRead for TcpStream {
624    fn poll_read(
625        mut self: Pin<&mut Self>,
626        cx: &mut Context<'_>,
627        buf: &mut [u8],
628    ) -> Poll<io::Result<usize>> {
629        loop {
630            // Attempt the non-blocking operation.
631            match self.inner.get_ref().read(buf) {
632                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
633                res => {
634                    self.readable = None;
635                    return Poll::Ready(res);
636                }
637            }
638
639            // Initialize the future to wait for readiness.
640            if self.readable.is_none() {
641                self.readable = Some(ReadableOwned::new(self.inner.clone()));
642            }
643
644            // Poll the future for readiness.
645            if let Some(f) = &mut self.readable {
646                let res = ready!(Pin::new(f).poll(cx));
647                self.readable = None;
648                res?;
649            }
650        }
651    }
652}
653
654impl AsyncWrite for TcpStream {
655    fn poll_write(
656        mut self: Pin<&mut Self>,
657        cx: &mut Context<'_>,
658        buf: &[u8],
659    ) -> Poll<io::Result<usize>> {
660        loop {
661            // Attempt the non-blocking operation.
662            match self.inner.get_ref().write(buf) {
663                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
664                res => {
665                    self.writable = None;
666                    return Poll::Ready(res);
667                }
668            }
669
670            // Initialize the future to wait for readiness.
671            if self.writable.is_none() {
672                self.writable = Some(WritableOwned::new(self.inner.clone()));
673            }
674
675            // Poll the future for readiness.
676            if let Some(f) = &mut self.writable {
677                let res = ready!(Pin::new(f).poll(cx));
678                self.writable = None;
679                res?;
680            }
681        }
682    }
683
684    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
685        loop {
686            // Attempt the non-blocking operation.
687            match self.inner.get_ref().flush() {
688                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
689                res => {
690                    self.writable = None;
691                    return Poll::Ready(res);
692                }
693            }
694
695            // Initialize the future to wait for readiness.
696            if self.writable.is_none() {
697                self.writable = Some(WritableOwned::new(self.inner.clone()));
698            }
699
700            // Poll the future for readiness.
701            if let Some(f) = &mut self.writable {
702                let res = ready!(Pin::new(f).poll(cx));
703                self.writable = None;
704                res?;
705            }
706        }
707    }
708
709    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
710        Poll::Ready(self.inner.get_ref().shutdown(Shutdown::Write))
711    }
712
713    fn poll_write_vectored(
714        mut self: Pin<&mut Self>,
715        cx: &mut Context<'_>,
716        bufs: &[IoSlice<'_>],
717    ) -> Poll<io::Result<usize>> {
718        loop {
719            // Attempt the non-blocking operation.
720            match self.inner.get_ref().write_vectored(bufs) {
721                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
722                res => {
723                    self.writable = None;
724                    return Poll::Ready(res);
725                }
726            }
727
728            // Initialize the future to wait for readiness.
729            if self.writable.is_none() {
730                self.writable = Some(WritableOwned::new(self.inner.clone()));
731            }
732
733            // Poll the future for readiness.
734            if let Some(f) = &mut self.writable {
735                let res = ready!(Pin::new(f).poll(cx));
736                self.writable = None;
737                res?;
738            }
739        }
740    }
741}