async_net_mini/tcp.rs
1use std::fmt;
2use std::io::{self, IoSlice, Read as _, Write as _};
3use std::net::{Shutdown, SocketAddr};
4#[cfg(unix)]
5use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
6#[cfg(windows)]
7use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket, RawSocket};
8use std::panic::{RefUnwindSafe, UnwindSafe};
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12
13use async_io_mini::Async;
14use futures_lite::{prelude::*, ready};
15
16use crate::util::{ReadableOwned, WritableOwned};
17
18/// A TCP server, listening for connections.
19///
20/// After creating a [`TcpListener`] by [`bind`][`TcpListener::bind()`]ing it to an address, it
21/// listens for incoming TCP connections. These can be accepted by calling
22/// [`accept()`][`TcpListener::accept()`] or by awaiting items from the stream of
23/// [`incoming`][`TcpListener::incoming()`] connections.
24///
25/// Cloning a [`TcpListener`] creates another handle to the same socket. The socket will be closed
26/// when all handles to it are dropped.
27///
28/// The Transmission Control Protocol is specified in [IETF RFC 793].
29///
30/// [IETF RFC 793]: https://tools.ietf.org/html/rfc793
31///
32/// # Examples
33///
34/// ```no_run
35/// use async_net::TcpListener;
36/// use futures_lite::prelude::*;
37///
38/// # futures_lite::future::block_on(async {
39/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
40/// let mut incoming = listener.incoming();
41///
42/// while let Some(stream) = incoming.next().await {
43/// let mut stream = stream?;
44/// stream.write_all(b"hello").await?;
45/// }
46/// # std::io::Result::Ok(()) });
47/// ```
48#[derive(Clone, Debug)]
49pub struct TcpListener {
50 inner: Arc<Async<std::net::TcpListener>>,
51}
52
53impl TcpListener {
54 fn new(inner: Arc<Async<std::net::TcpListener>>) -> TcpListener {
55 TcpListener { inner }
56 }
57
58 /// Creates a new [`TcpListener`] bound to the given address.
59 ///
60 /// Binding with a port number of 0 will request that the operating system assigns an available
61 /// port to this listener. The assigned port can be queried via the
62 /// [`local_addr()`][`TcpListener::local_addr()`] method.
63 ///
64 /// If `addr` yields multiple addresses, binding will be attempted with each of the addresses
65 /// until one succeeds and returns the listener. If none of the addresses succeed in creating a
66 /// listener, the error from the last attempt is returned.
67 ///
68 /// # Examples
69 ///
70 /// Create a TCP listener bound to `127.0.0.1:80`:
71 ///
72 /// ```no_run
73 /// use async_net::TcpListener;
74 ///
75 /// # futures_lite::future::block_on(async {
76 /// let listener = TcpListener::bind("127.0.0.1:80").await?;
77 /// # std::io::Result::Ok(()) });
78 /// ```
79 ///
80 /// Create a TCP listener bound to `127.0.0.1:80`. If that address is unavailable, then try
81 /// binding to `127.0.0.1:443`:
82 ///
83 /// ```no_run
84 /// use async_net::{SocketAddr, TcpListener};
85 ///
86 /// # futures_lite::future::block_on(async {
87 /// let addrs = [
88 /// SocketAddr::from(([127, 0, 0, 1], 80)),
89 /// SocketAddr::from(([127, 0, 0, 1], 443)),
90 /// ];
91 /// let listener = TcpListener::bind(&addrs[..]).await.unwrap();
92 /// # std::io::Result::Ok(()) });
93 pub async fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
94 Ok(TcpListener::new(Arc::new(
95 Async::<std::net::TcpListener>::bind(addr)?,
96 )))
97 }
98
99 /// Returns the local address this listener is bound to.
100 ///
101 /// # Examples
102 ///
103 /// Bind to port 0 and then see which port was assigned by the operating system:
104 ///
105 /// ```no_run
106 /// use async_net::{SocketAddr, TcpListener};
107 ///
108 /// # futures_lite::future::block_on(async {
109 /// let listener = TcpListener::bind("127.0.0.1:0").await?;
110 /// println!("Listening on {}", listener.local_addr()?);
111 /// # std::io::Result::Ok(()) });
112 pub fn local_addr(&self) -> io::Result<SocketAddr> {
113 self.inner.get_ref().local_addr()
114 }
115
116 /// Accepts a new incoming connection.
117 ///
118 /// Returns a TCP stream and the address it is connected to.
119 ///
120 /// # Examples
121 ///
122 /// ```no_run
123 /// use async_net::TcpListener;
124 ///
125 /// # futures_lite::future::block_on(async {
126 /// let listener = TcpListener::bind("127.0.0.1:8080").await?;
127 /// let (stream, addr) = listener.accept().await?;
128 /// # std::io::Result::Ok(()) });
129 /// ```
130 pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
131 let (stream, addr) = self.inner.accept().await?;
132 Ok((TcpStream::new(Arc::new(stream)), addr))
133 }
134
135 /// Returns a stream of incoming connections.
136 ///
137 /// Iterating over this stream is equivalent to calling [`accept()`][`TcpListener::accept()`]
138 /// in a loop. The stream of connections is infinite, i.e awaiting the next connection will
139 /// never result in [`None`].
140 ///
141 /// # Examples
142 ///
143 /// ```no_run
144 /// use async_net::TcpListener;
145 /// use futures_lite::prelude::*;
146 ///
147 /// # futures_lite::future::block_on(async {
148 /// let listener = TcpListener::bind("127.0.0.1:0").await?;
149 /// let mut incoming = listener.incoming();
150 ///
151 /// while let Some(stream) = incoming.next().await {
152 /// let mut stream = stream?;
153 /// stream.write_all(b"hello").await?;
154 /// }
155 /// # std::io::Result::Ok(()) });
156 /// ```
157 pub fn incoming(&self) -> Incoming<'_> {
158 Incoming {
159 incoming: Box::pin(self.inner.incoming()),
160 }
161 }
162
163 /// Gets the value of the `IP_TTL` option for this socket.
164 ///
165 /// This option configures the time-to-live field that is used in every packet sent from this
166 /// socket.
167 ///
168 /// # Examples
169 ///
170 /// ```no_run
171 /// use async_net::TcpListener;
172 ///
173 /// # futures_lite::future::block_on(async {
174 /// let listener = TcpListener::bind("127.0.0.1:80").await?;
175 /// listener.set_ttl(100)?;
176 /// assert_eq!(listener.ttl()?, 100);
177 /// # std::io::Result::Ok(()) });
178 /// ```
179 pub fn ttl(&self) -> io::Result<u32> {
180 self.inner.get_ref().ttl()
181 }
182
183 /// Sets the value of the `IP_TTL` option for this socket.
184 ///
185 /// This option configures the time-to-live field that is used in every packet sent from this
186 /// socket.
187 ///
188 /// # Examples
189 ///
190 /// ```no_run
191 /// use async_net::TcpListener;
192 ///
193 /// # futures_lite::future::block_on(async {
194 /// let listener = TcpListener::bind("127.0.0.1:80").await?;
195 /// listener.set_ttl(100)?;
196 /// # std::io::Result::Ok(()) });
197 /// ```
198 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
199 self.inner.get_ref().set_ttl(ttl)
200 }
201}
202
203impl From<Async<std::net::TcpListener>> for TcpListener {
204 fn from(listener: Async<std::net::TcpListener>) -> TcpListener {
205 TcpListener::new(Arc::new(listener))
206 }
207}
208
209impl TryFrom<std::net::TcpListener> for TcpListener {
210 type Error = io::Error;
211
212 fn try_from(listener: std::net::TcpListener) -> io::Result<TcpListener> {
213 Ok(TcpListener::new(Arc::new(Async::new(listener)?)))
214 }
215}
216
217impl From<TcpListener> for Arc<Async<std::net::TcpListener>> {
218 fn from(val: TcpListener) -> Self {
219 val.inner
220 }
221}
222
223#[cfg(unix)]
224impl AsRawFd for TcpListener {
225 fn as_raw_fd(&self) -> RawFd {
226 self.inner.as_raw_fd()
227 }
228}
229
230#[cfg(unix)]
231impl AsFd for TcpListener {
232 fn as_fd(&self) -> BorrowedFd<'_> {
233 self.inner.get_ref().as_fd()
234 }
235}
236
237#[cfg(unix)]
238impl TryFrom<OwnedFd> for TcpListener {
239 type Error = io::Error;
240
241 fn try_from(value: OwnedFd) -> Result<Self, Self::Error> {
242 Self::try_from(std::net::TcpListener::from(value))
243 }
244}
245
246#[cfg(windows)]
247impl AsRawSocket for TcpListener {
248 fn as_raw_socket(&self) -> RawSocket {
249 self.inner.as_raw_socket()
250 }
251}
252
253#[cfg(windows)]
254impl AsSocket for TcpListener {
255 fn as_socket(&self) -> BorrowedSocket<'_> {
256 self.inner.get_ref().as_socket()
257 }
258}
259
260#[cfg(windows)]
261impl TryFrom<OwnedSocket> for TcpListener {
262 type Error = io::Error;
263
264 fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> {
265 Self::try_from(std::net::TcpListener::from(value))
266 }
267}
268
269/// A stream of incoming TCP connections.
270///
271/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is
272/// created by the [`TcpListener::incoming()`] method.
273pub struct Incoming<'a> {
274 incoming:
275 Pin<Box<dyn Stream<Item = io::Result<Async<std::net::TcpStream>>> + Send + Sync + 'a>>,
276}
277
278impl Stream for Incoming<'_> {
279 type Item = io::Result<TcpStream>;
280
281 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
282 let res = ready!(Pin::new(&mut self.incoming).poll_next(cx));
283 Poll::Ready(res.map(|res| res.map(|stream| TcpStream::new(Arc::new(stream)))))
284 }
285}
286
287impl fmt::Debug for Incoming<'_> {
288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289 write!(f, "Incoming {{ ... }}")
290 }
291}
292
293/// A TCP connection.
294///
295/// A [`TcpStream`] can be created by [`connect`][`TcpStream::connect()`]ing to an endpoint or by
296/// [`accept`][`TcpListener::accept()`]ing an incoming connection.
297///
298/// [`TcpStream`] is a bidirectional stream that implements traits [`AsyncRead`] and
299/// [`AsyncWrite`].
300///
301/// Cloning a [`TcpStream`] creates another handle to the same socket. The socket will be closed
302/// when all handles to it are dropped. The reading and writing portions of the connection can also
303/// be shut down individually with the [`shutdown()`][`TcpStream::shutdown()`] method.
304///
305/// The Transmission Control Protocol is specified in [IETF RFC 793].
306///
307/// [IETF RFC 793]: https://tools.ietf.org/html/rfc793
308///
309/// # Examples
310///
311/// ```no_run
312/// use async_net::TcpStream;
313/// use futures_lite::prelude::*;
314///
315/// # futures_lite::future::block_on(async {
316/// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
317/// stream.write_all(b"hello").await?;
318///
319/// let mut buf = vec![0u8; 1024];
320/// let n = stream.read(&mut buf).await?;
321/// # std::io::Result::Ok(()) });
322/// ```
323pub struct TcpStream {
324 inner: Arc<Async<std::net::TcpStream>>,
325 readable: Option<ReadableOwned<std::net::TcpStream>>,
326 writable: Option<WritableOwned<std::net::TcpStream>>,
327}
328
329impl UnwindSafe for TcpStream {}
330impl RefUnwindSafe for TcpStream {}
331
332impl TcpStream {
333 fn new(inner: Arc<Async<std::net::TcpStream>>) -> TcpStream {
334 TcpStream {
335 inner,
336 readable: None,
337 writable: None,
338 }
339 }
340
341 /// Creates a TCP connection to the specified address.
342 ///
343 /// This method will create a new TCP socket and attempt to connect it to the provided `addr`,
344 ///
345 /// If `addr` yields multiple addresses, connecting will be attempted with each of the
346 /// addresses until connecting to one succeeds. If none of the addresses result in a successful
347 /// connection, the error from the last connect attempt is returned.
348 ///
349 /// # Examples
350 ///
351 /// Connect to `example.com:80`:
352 ///
353 /// ```
354 /// use async_net::TcpStream;
355 ///
356 /// # futures_lite::future::block_on(async {
357 /// let stream = TcpStream::connect("example.com:80").await?;
358 /// # std::io::Result::Ok(()) });
359 /// ```
360 ///
361 /// Connect to `127.0.0.1:8080`. If that fails, then try connecting to `127.0.0.1:8081`:
362 ///
363 /// ```no_run
364 /// use async_net::{SocketAddr, TcpStream};
365 ///
366 /// # futures_lite::future::block_on(async {
367 /// let addrs = [
368 /// SocketAddr::from(([127, 0, 0, 1], 8080)),
369 /// SocketAddr::from(([127, 0, 0, 1], 8081)),
370 /// ];
371 /// let stream = TcpStream::connect(&addrs[..]).await?;
372 /// # std::io::Result::Ok(()) });
373 /// ```
374 pub async fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
375 Ok(TcpStream::new(Arc::new(
376 Async::<std::net::TcpStream>::connect(addr).await?,
377 )))
378 }
379
380 /// Returns the local address this stream is bound to.
381 ///
382 /// # Examples
383 ///
384 /// ```
385 /// use async_net::TcpStream;
386 ///
387 /// # futures_lite::future::block_on(async {
388 /// let stream = TcpStream::connect("example.com:80").await?;
389 /// println!("Local address is {}", stream.local_addr()?);
390 /// # std::io::Result::Ok(()) });
391 /// ```
392 pub fn local_addr(&self) -> io::Result<SocketAddr> {
393 self.inner.get_ref().local_addr()
394 }
395
396 /// Returns the remote address this stream is connected to.
397 ///
398 /// # Examples
399 ///
400 /// ```
401 /// use async_net::TcpStream;
402 ///
403 /// # futures_lite::future::block_on(async {
404 /// let stream = TcpStream::connect("example.com:80").await?;
405 /// println!("Connected to {}", stream.peer_addr()?);
406 /// # std::io::Result::Ok(()) });
407 /// ```
408 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
409 self.inner.get_ref().peer_addr()
410 }
411
412 /// Shuts down the read half, write half, or both halves of this connection.
413 ///
414 /// This method will cause all pending and future I/O in the given directions to return
415 /// immediately with an appropriate value (see the documentation of [`Shutdown`]).
416 ///
417 /// [`Shutdown`]: https://doc.rust-lang.org/std/net/enum.Shutdown.html
418 ///
419 /// # Examples
420 ///
421 /// ```no_run
422 /// use async_net::{Shutdown, TcpStream};
423 ///
424 /// # futures_lite::future::block_on(async {
425 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
426 /// stream.shutdown(Shutdown::Both)?;
427 /// # std::io::Result::Ok(()) });
428 /// ```
429 pub fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> {
430 self.inner.get_ref().shutdown(how)
431 }
432
433 /// Receives data without removing it from the queue.
434 ///
435 /// On success, returns the number of bytes peeked.
436 ///
437 /// Successive calls return the same data. This is accomplished by passing `MSG_PEEK` as a flag
438 /// to the underlying `recv` system call.
439 ///
440 /// # Examples
441 ///
442 /// ```no_run
443 /// use async_net::TcpStream;
444 ///
445 /// # futures_lite::future::block_on(async {
446 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
447 ///
448 /// let mut buf = vec![0; 1024];
449 /// let n = stream.peek(&mut buf).await?;
450 /// # std::io::Result::Ok(()) });
451 /// ```
452 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
453 self.inner.peek(buf).await
454 }
455
456 /// Gets the value of the `TCP_NODELAY` option for this socket.
457 ///
458 /// If set to `true`, this option disables the [Nagle algorithm][nagle-wiki]. This means that
459 /// written data is always sent as soon as possible, even if there is only a small amount of
460 /// it.
461 ///
462 /// When set to `false`, written data is buffered until there is a certain amount to send out,
463 /// thereby avoiding the frequent sending of small packets.
464 ///
465 /// [nagle-wiki]: https://en.wikipedia.org/wiki/Nagle%27s_algorithm
466 ///
467 /// # Examples
468 ///
469 /// ```no_run
470 /// use async_net::TcpStream;
471 ///
472 /// # futures_lite::future::block_on(async {
473 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
474 /// println!("TCP_NODELAY is set to {}", stream.nodelay()?);
475 /// # std::io::Result::Ok(()) });
476 /// ```
477 pub fn nodelay(&self) -> io::Result<bool> {
478 self.inner.get_ref().nodelay()
479 }
480
481 /// Sets the value of the `TCP_NODELAY` option for this socket.
482 ///
483 /// If set to `true`, this option disables the [Nagle algorithm][nagle-wiki]. This means that
484 /// written data is always sent as soon as possible, even if there is only a small amount of
485 /// it.
486 ///
487 /// When set to `false`, written data is buffered until there is a certain amount to send out,
488 /// thereby avoiding the frequent sending of small packets.
489 ///
490 /// [nagle-wiki]: https://en.wikipedia.org/wiki/Nagle%27s_algorithm
491 ///
492 /// # Examples
493 ///
494 /// ```no_run
495 /// use async_net::TcpStream;
496 ///
497 /// # futures_lite::future::block_on(async {
498 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
499 /// stream.set_nodelay(false)?;
500 /// # std::io::Result::Ok(()) });
501 /// ```
502 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
503 self.inner.get_ref().set_nodelay(nodelay)
504 }
505
506 /// Gets the value of the `IP_TTL` option for this socket.
507 ///
508 /// This option configures the time-to-live field that is used in every packet sent from this
509 /// socket.
510 ///
511 /// # Examples
512 ///
513 /// ```no_run
514 /// use async_net::TcpStream;
515 ///
516 /// # futures_lite::future::block_on(async {
517 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
518 /// println!("IP_TTL is set to {}", stream.ttl()?);
519 /// # std::io::Result::Ok(()) });
520 /// ```
521 pub fn ttl(&self) -> io::Result<u32> {
522 self.inner.get_ref().ttl()
523 }
524
525 /// Sets the value of the `IP_TTL` option for this socket.
526 ///
527 /// This option configures the time-to-live field that is used in every packet sent from this
528 /// socket.
529 ///
530 /// # Examples
531 ///
532 /// ```no_run
533 /// use async_net::TcpStream;
534 ///
535 /// # futures_lite::future::block_on(async {
536 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
537 /// stream.set_ttl(100)?;
538 /// # std::io::Result::Ok(()) });
539 /// ```
540 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
541 self.inner.get_ref().set_ttl(ttl)
542 }
543}
544
545impl fmt::Debug for TcpStream {
546 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
547 self.inner.fmt(f)
548 }
549}
550
551impl Clone for TcpStream {
552 fn clone(&self) -> TcpStream {
553 TcpStream::new(self.inner.clone())
554 }
555}
556
557impl From<Async<std::net::TcpStream>> for TcpStream {
558 fn from(stream: Async<std::net::TcpStream>) -> TcpStream {
559 TcpStream::new(Arc::new(stream))
560 }
561}
562
563impl From<TcpStream> for Arc<Async<std::net::TcpStream>> {
564 fn from(val: TcpStream) -> Self {
565 val.inner
566 }
567}
568
569impl TryFrom<std::net::TcpStream> for TcpStream {
570 type Error = io::Error;
571
572 fn try_from(stream: std::net::TcpStream) -> io::Result<TcpStream> {
573 Ok(TcpStream::new(Arc::new(Async::new(stream)?)))
574 }
575}
576
577#[cfg(unix)]
578impl AsRawFd for TcpStream {
579 fn as_raw_fd(&self) -> RawFd {
580 self.inner.as_raw_fd()
581 }
582}
583
584#[cfg(unix)]
585impl AsFd for TcpStream {
586 fn as_fd(&self) -> BorrowedFd<'_> {
587 self.inner.get_ref().as_fd()
588 }
589}
590
591#[cfg(unix)]
592impl TryFrom<OwnedFd> for TcpStream {
593 type Error = io::Error;
594
595 fn try_from(value: OwnedFd) -> Result<Self, Self::Error> {
596 Self::try_from(std::net::TcpStream::from(value))
597 }
598}
599
600#[cfg(windows)]
601impl AsRawSocket for TcpStream {
602 fn as_raw_socket(&self) -> RawSocket {
603 self.inner.as_raw_socket()
604 }
605}
606
607#[cfg(windows)]
608impl AsSocket for TcpStream {
609 fn as_socket(&self) -> BorrowedSocket<'_> {
610 self.inner.get_ref().as_socket()
611 }
612}
613
614#[cfg(windows)]
615impl TryFrom<OwnedSocket> for TcpStream {
616 type Error = io::Error;
617
618 fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> {
619 Self::try_from(std::net::TcpStream::from(value))
620 }
621}
622
623impl AsyncRead for TcpStream {
624 fn poll_read(
625 mut self: Pin<&mut Self>,
626 cx: &mut Context<'_>,
627 buf: &mut [u8],
628 ) -> Poll<io::Result<usize>> {
629 loop {
630 // Attempt the non-blocking operation.
631 match self.inner.get_ref().read(buf) {
632 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
633 res => {
634 self.readable = None;
635 return Poll::Ready(res);
636 }
637 }
638
639 // Initialize the future to wait for readiness.
640 if self.readable.is_none() {
641 self.readable = Some(ReadableOwned::new(self.inner.clone()));
642 }
643
644 // Poll the future for readiness.
645 if let Some(f) = &mut self.readable {
646 let res = ready!(Pin::new(f).poll(cx));
647 self.readable = None;
648 res?;
649 }
650 }
651 }
652}
653
654impl AsyncWrite for TcpStream {
655 fn poll_write(
656 mut self: Pin<&mut Self>,
657 cx: &mut Context<'_>,
658 buf: &[u8],
659 ) -> Poll<io::Result<usize>> {
660 loop {
661 // Attempt the non-blocking operation.
662 match self.inner.get_ref().write(buf) {
663 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
664 res => {
665 self.writable = None;
666 return Poll::Ready(res);
667 }
668 }
669
670 // Initialize the future to wait for readiness.
671 if self.writable.is_none() {
672 self.writable = Some(WritableOwned::new(self.inner.clone()));
673 }
674
675 // Poll the future for readiness.
676 if let Some(f) = &mut self.writable {
677 let res = ready!(Pin::new(f).poll(cx));
678 self.writable = None;
679 res?;
680 }
681 }
682 }
683
684 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
685 loop {
686 // Attempt the non-blocking operation.
687 match self.inner.get_ref().flush() {
688 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
689 res => {
690 self.writable = None;
691 return Poll::Ready(res);
692 }
693 }
694
695 // Initialize the future to wait for readiness.
696 if self.writable.is_none() {
697 self.writable = Some(WritableOwned::new(self.inner.clone()));
698 }
699
700 // Poll the future for readiness.
701 if let Some(f) = &mut self.writable {
702 let res = ready!(Pin::new(f).poll(cx));
703 self.writable = None;
704 res?;
705 }
706 }
707 }
708
709 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
710 Poll::Ready(self.inner.get_ref().shutdown(Shutdown::Write))
711 }
712
713 fn poll_write_vectored(
714 mut self: Pin<&mut Self>,
715 cx: &mut Context<'_>,
716 bufs: &[IoSlice<'_>],
717 ) -> Poll<io::Result<usize>> {
718 loop {
719 // Attempt the non-blocking operation.
720 match self.inner.get_ref().write_vectored(bufs) {
721 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
722 res => {
723 self.writable = None;
724 return Poll::Ready(res);
725 }
726 }
727
728 // Initialize the future to wait for readiness.
729 if self.writable.is_none() {
730 self.writable = Some(WritableOwned::new(self.inner.clone()));
731 }
732
733 // Poll the future for readiness.
734 if let Some(f) = &mut self.writable {
735 let res = ready!(Pin::new(f).poll(cx));
736 self.writable = None;
737 res?;
738 }
739 }
740 }
741}