async_io/
lib.rs

1//! Async I/O and timers.
2//!
3//! This crate provides two tools:
4//!
5//! * [`Async`], an adapter for standard networking types (and [many other] types) to use in
6//!   async programs.
7//! * [`Timer`], a future or stream that emits timed events.
8//!
9//! For concrete async networking types built on top of this crate, see [`async-net`].
10//!
11//! [many other]: https://github.com/smol-rs/async-io/tree/master/examples
12//! [`async-net`]: https://docs.rs/async-net
13//!
14//! # Implementation
15//!
16//! The first time [`Async`] or [`Timer`] is used, a thread named "async-io" will be spawned.
17//! The purpose of this thread is to wait for I/O events reported by the operating system, and then
18//! wake appropriate futures blocked on I/O or timers when they can be resumed.
19//!
20//! To wait for the next I/O event, the "async-io" thread uses [epoll] on Linux/Android/illumos,
21//! [kqueue] on macOS/iOS/BSD, [event ports] on illumos/Solaris, and [IOCP] on Windows. That
22//! functionality is provided by the [`polling`] crate.
23//!
24//! However, note that you can also process I/O events and wake futures on any thread using the
25//! [`block_on()`] function. The "async-io" thread is therefore just a fallback mechanism
26//! processing I/O events in case no other threads are.
27//!
28//! [epoll]: https://en.wikipedia.org/wiki/Epoll
29//! [kqueue]: https://en.wikipedia.org/wiki/Kqueue
30//! [event ports]: https://illumos.org/man/port_create
31//! [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
32//! [`polling`]: https://docs.rs/polling
33//!
34//! # Examples
35//!
36//! Connect to `example.com:80`, or time out after 10 seconds.
37//!
38//! ```
39//! use async_io::{Async, Timer};
40//! use futures_lite::{future::FutureExt, io};
41//!
42//! use std::net::{TcpStream, ToSocketAddrs};
43//! use std::time::Duration;
44//!
45//! # futures_lite::future::block_on(async {
46//! let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
47//!
48//! let stream = Async::<TcpStream>::connect(addr).or(async {
49//!     Timer::after(Duration::from_secs(10)).await;
50//!     Err(io::ErrorKind::TimedOut.into())
51//! })
52//! .await?;
53//! # std::io::Result::Ok(()) });
54//! ```
55
56#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
57#![doc(
58    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
59)]
60#![doc(
61    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
62)]
63
64use std::future::Future;
65use std::io::{self, IoSlice, IoSliceMut, Read, Write};
66use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket};
67use std::pin::Pin;
68use std::sync::Arc;
69use std::task::{Context, Poll, Waker};
70use std::time::{Duration, Instant};
71
72#[cfg(unix)]
73use std::{
74    os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd},
75    os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
76    path::Path,
77};
78
79#[cfg(windows)]
80use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket, RawSocket};
81
82use futures_io::{AsyncRead, AsyncWrite};
83use futures_lite::stream::{self, Stream};
84use futures_lite::{future, pin, ready};
85
86use rustix::io as rio;
87use rustix::net as rn;
88use rustix::net::addr::SocketAddrArg;
89
90use crate::reactor::{Reactor, Registration, Source};
91
92mod driver;
93mod reactor;
94
95pub mod os;
96
97pub use driver::block_on;
98pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned};
99
100/// A future or stream that emits timed events.
101///
102/// Timers are futures that output a single [`Instant`] when they fire.
103///
104/// Timers are also streams that can output [`Instant`]s periodically.
105///
106/// # Precision
107///
108/// There is a limit on the maximum precision that a `Timer` can provide. This limit is
109/// dependent on the current platform; for instance, on Windows, the maximum precision is
110/// about 16 milliseconds. Because of this limit, the timer may sleep for longer than the
111/// requested duration. It will never sleep for less.
112///
113/// # Examples
114///
115/// Sleep for 1 second:
116///
117/// ```
118/// use async_io::Timer;
119/// use std::time::Duration;
120///
121/// # futures_lite::future::block_on(async {
122/// Timer::after(Duration::from_secs(1)).await;
123/// # });
124/// ```
125///
126/// Timeout after 1 second:
127///
128/// ```
129/// use async_io::Timer;
130/// use futures_lite::FutureExt;
131/// use std::time::Duration;
132///
133/// # futures_lite::future::block_on(async {
134/// let addrs = async_net::resolve("google.com:80")
135///     .or(async {
136///         Timer::after(Duration::from_secs(1)).await;
137///         Err(std::io::ErrorKind::TimedOut.into())
138///     })
139///     .await?;
140/// # std::io::Result::Ok(()) });
141/// ```
142#[derive(Debug)]
143pub struct Timer {
144    /// This timer's ID and last waker that polled it.
145    ///
146    /// When this field is set to `None`, this timer is not registered in the reactor.
147    id_and_waker: Option<(usize, Waker)>,
148
149    /// The next instant at which this timer fires.
150    ///
151    /// If this timer is a blank timer, this value is None. If the timer
152    /// must be set, this value contains the next instant at which the
153    /// timer must fire.
154    when: Option<Instant>,
155
156    /// The period.
157    period: Duration,
158}
159
160impl Timer {
161    /// Creates a timer that will never fire.
162    ///
163    /// # Examples
164    ///
165    /// This function may also be useful for creating a function with an optional timeout.
166    ///
167    /// ```
168    /// # futures_lite::future::block_on(async {
169    /// use async_io::Timer;
170    /// use futures_lite::prelude::*;
171    /// use std::time::Duration;
172    ///
173    /// async fn run_with_timeout(timeout: Option<Duration>) {
174    ///     let timer = timeout
175    ///         .map(|timeout| Timer::after(timeout))
176    ///         .unwrap_or_else(Timer::never);
177    ///
178    ///     run_lengthy_operation().or(timer).await;
179    /// }
180    /// # // Note that since a Timer as a Future returns an Instant,
181    /// # // this function needs to return an Instant to be used
182    /// # // in "or".
183    /// # async fn run_lengthy_operation() -> std::time::Instant {
184    /// #    std::time::Instant::now()
185    /// # }
186    ///
187    /// // Times out after 5 seconds.
188    /// run_with_timeout(Some(Duration::from_secs(5))).await;
189    /// // Does not time out.
190    /// run_with_timeout(None).await;
191    /// # });
192    /// ```
193    pub fn never() -> Timer {
194        Timer {
195            id_and_waker: None,
196            when: None,
197            period: Duration::MAX,
198        }
199    }
200
201    /// Creates a timer that emits an event once after the given duration of time.
202    ///
203    /// # Examples
204    ///
205    /// ```
206    /// use async_io::Timer;
207    /// use std::time::Duration;
208    ///
209    /// # futures_lite::future::block_on(async {
210    /// Timer::after(Duration::from_secs(1)).await;
211    /// # });
212    /// ```
213    pub fn after(duration: Duration) -> Timer {
214        Instant::now()
215            .checked_add(duration)
216            .map_or_else(Timer::never, Timer::at)
217    }
218
219    /// Creates a timer that emits an event once at the given time instant.
220    ///
221    /// # Examples
222    ///
223    /// ```
224    /// use async_io::Timer;
225    /// use std::time::{Duration, Instant};
226    ///
227    /// # futures_lite::future::block_on(async {
228    /// let now = Instant::now();
229    /// let when = now + Duration::from_secs(1);
230    /// Timer::at(when).await;
231    /// # });
232    /// ```
233    pub fn at(instant: Instant) -> Timer {
234        Timer::interval_at(instant, Duration::MAX)
235    }
236
237    /// Creates a timer that emits events periodically.
238    ///
239    /// # Examples
240    ///
241    /// ```
242    /// use async_io::Timer;
243    /// use futures_lite::StreamExt;
244    /// use std::time::{Duration, Instant};
245    ///
246    /// # futures_lite::future::block_on(async {
247    /// let period = Duration::from_secs(1);
248    /// Timer::interval(period).next().await;
249    /// # });
250    /// ```
251    pub fn interval(period: Duration) -> Timer {
252        Instant::now()
253            .checked_add(period)
254            .map_or_else(Timer::never, |at| Timer::interval_at(at, period))
255    }
256
257    /// Creates a timer that emits events periodically, starting at `start`.
258    ///
259    /// # Examples
260    ///
261    /// ```
262    /// use async_io::Timer;
263    /// use futures_lite::StreamExt;
264    /// use std::time::{Duration, Instant};
265    ///
266    /// # futures_lite::future::block_on(async {
267    /// let start = Instant::now();
268    /// let period = Duration::from_secs(1);
269    /// Timer::interval_at(start, period).next().await;
270    /// # });
271    /// ```
272    pub fn interval_at(start: Instant, period: Duration) -> Timer {
273        Timer {
274            id_and_waker: None,
275            when: Some(start),
276            period,
277        }
278    }
279
280    /// Indicates whether or not this timer will ever fire.
281    ///
282    /// [`never()`] will never fire, and timers created with [`after()`] or [`at()`] will fire
283    /// if the duration is not too large.
284    ///
285    /// [`never()`]: Timer::never()
286    /// [`after()`]: Timer::after()
287    /// [`at()`]: Timer::at()
288    ///
289    /// # Examples
290    ///
291    /// ```
292    /// # futures_lite::future::block_on(async {
293    /// use async_io::Timer;
294    /// use futures_lite::prelude::*;
295    /// use std::time::Duration;
296    ///
297    /// // `never` will never fire.
298    /// assert!(!Timer::never().will_fire());
299    ///
300    /// // `after` will fire if the duration is not too large.
301    /// assert!(Timer::after(Duration::from_secs(1)).will_fire());
302    /// assert!(!Timer::after(Duration::MAX).will_fire());
303    ///
304    /// // However, once an `after` timer has fired, it will never fire again.
305    /// let mut t = Timer::after(Duration::from_secs(1));
306    /// assert!(t.will_fire());
307    /// (&mut t).await;
308    /// assert!(!t.will_fire());
309    ///
310    /// // Interval timers will fire periodically.
311    /// let mut t = Timer::interval(Duration::from_secs(1));
312    /// assert!(t.will_fire());
313    /// t.next().await;
314    /// assert!(t.will_fire());
315    /// # });
316    /// ```
317    #[inline]
318    pub fn will_fire(&self) -> bool {
319        self.when.is_some()
320    }
321
322    /// Sets the timer to emit an event once after the given duration of time.
323    ///
324    /// Note that resetting a timer is different from creating a new timer because
325    /// [`set_after()`][`Timer::set_after()`] does not remove the waker associated with the task
326    /// that is polling the timer.
327    ///
328    /// # Examples
329    ///
330    /// ```
331    /// use async_io::Timer;
332    /// use std::time::Duration;
333    ///
334    /// # futures_lite::future::block_on(async {
335    /// let mut t = Timer::after(Duration::from_secs(1));
336    /// t.set_after(Duration::from_millis(100));
337    /// # });
338    /// ```
339    pub fn set_after(&mut self, duration: Duration) {
340        match Instant::now().checked_add(duration) {
341            Some(instant) => self.set_at(instant),
342            None => {
343                // Overflow to never going off.
344                self.clear();
345                self.when = None;
346            }
347        }
348    }
349
350    /// Sets the timer to emit an event once at the given time instant.
351    ///
352    /// Note that resetting a timer is different from creating a new timer because
353    /// [`set_at()`][`Timer::set_at()`] does not remove the waker associated with the task
354    /// that is polling the timer.
355    ///
356    /// # Examples
357    ///
358    /// ```
359    /// use async_io::Timer;
360    /// use std::time::{Duration, Instant};
361    ///
362    /// # futures_lite::future::block_on(async {
363    /// let mut t = Timer::after(Duration::from_secs(1));
364    ///
365    /// let now = Instant::now();
366    /// let when = now + Duration::from_secs(1);
367    /// t.set_at(when);
368    /// # });
369    /// ```
370    pub fn set_at(&mut self, instant: Instant) {
371        self.clear();
372
373        // Update the timeout.
374        self.when = Some(instant);
375
376        if let Some((id, waker)) = self.id_and_waker.as_mut() {
377            // Re-register the timer with the new timeout.
378            *id = Reactor::get().insert_timer(instant, waker);
379        }
380    }
381
382    /// Sets the timer to emit events periodically.
383    ///
384    /// Note that resetting a timer is different from creating a new timer because
385    /// [`set_interval()`][`Timer::set_interval()`] does not remove the waker associated with the
386    /// task that is polling the timer.
387    ///
388    /// # Examples
389    ///
390    /// ```
391    /// use async_io::Timer;
392    /// use futures_lite::StreamExt;
393    /// use std::time::{Duration, Instant};
394    ///
395    /// # futures_lite::future::block_on(async {
396    /// let mut t = Timer::after(Duration::from_secs(1));
397    ///
398    /// let period = Duration::from_secs(2);
399    /// t.set_interval(period);
400    /// # });
401    /// ```
402    pub fn set_interval(&mut self, period: Duration) {
403        match Instant::now().checked_add(period) {
404            Some(instant) => self.set_interval_at(instant, period),
405            None => {
406                // Overflow to never going off.
407                self.clear();
408                self.when = None;
409            }
410        }
411    }
412
413    /// Sets the timer to emit events periodically, starting at `start`.
414    ///
415    /// Note that resetting a timer is different from creating a new timer because
416    /// [`set_interval_at()`][`Timer::set_interval_at()`] does not remove the waker associated with
417    /// the task that is polling the timer.
418    ///
419    /// # Examples
420    ///
421    /// ```
422    /// use async_io::Timer;
423    /// use futures_lite::StreamExt;
424    /// use std::time::{Duration, Instant};
425    ///
426    /// # futures_lite::future::block_on(async {
427    /// let mut t = Timer::after(Duration::from_secs(1));
428    ///
429    /// let start = Instant::now();
430    /// let period = Duration::from_secs(2);
431    /// t.set_interval_at(start, period);
432    /// # });
433    /// ```
434    pub fn set_interval_at(&mut self, start: Instant, period: Duration) {
435        self.clear();
436
437        self.when = Some(start);
438        self.period = period;
439
440        if let Some((id, waker)) = self.id_and_waker.as_mut() {
441            // Re-register the timer with the new timeout.
442            *id = Reactor::get().insert_timer(start, waker);
443        }
444    }
445
446    /// Helper function to clear the current timer.
447    fn clear(&mut self) {
448        if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.as_ref()) {
449            // Deregister the timer from the reactor.
450            Reactor::get().remove_timer(when, *id);
451        }
452    }
453}
454
455impl Drop for Timer {
456    fn drop(&mut self) {
457        if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.take()) {
458            // Deregister the timer from the reactor.
459            Reactor::get().remove_timer(when, id);
460        }
461    }
462}
463
464impl Future for Timer {
465    type Output = Instant;
466
467    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
468        match self.poll_next(cx) {
469            Poll::Ready(Some(when)) => Poll::Ready(when),
470            Poll::Pending => Poll::Pending,
471            Poll::Ready(None) => unreachable!(),
472        }
473    }
474}
475
476impl Stream for Timer {
477    type Item = Instant;
478
479    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
480        let this = self.get_mut();
481
482        if let Some(ref mut when) = this.when {
483            // Check if the timer has already fired.
484            if Instant::now() >= *when {
485                if let Some((id, _)) = this.id_and_waker.take() {
486                    // Deregister the timer from the reactor.
487                    Reactor::get().remove_timer(*when, id);
488                }
489                let result_time = *when;
490                if let Some(next) = (*when).checked_add(this.period) {
491                    *when = next;
492                    // Register the timer in the reactor.
493                    let id = Reactor::get().insert_timer(next, cx.waker());
494                    this.id_and_waker = Some((id, cx.waker().clone()));
495                } else {
496                    this.when = None;
497                }
498                return Poll::Ready(Some(result_time));
499            } else {
500                match &this.id_and_waker {
501                    None => {
502                        // Register the timer in the reactor.
503                        let id = Reactor::get().insert_timer(*when, cx.waker());
504                        this.id_and_waker = Some((id, cx.waker().clone()));
505                    }
506                    Some((id, w)) if !w.will_wake(cx.waker()) => {
507                        // Deregister the timer from the reactor to remove the old waker.
508                        Reactor::get().remove_timer(*when, *id);
509
510                        // Register the timer in the reactor with the new waker.
511                        let id = Reactor::get().insert_timer(*when, cx.waker());
512                        this.id_and_waker = Some((id, cx.waker().clone()));
513                    }
514                    Some(_) => {}
515                }
516            }
517        }
518
519        Poll::Pending
520    }
521}
522
523/// Async adapter for I/O types.
524///
525/// This type puts an I/O handle into non-blocking mode, registers it in
526/// [epoll]/[kqueue]/[event ports]/[IOCP], and then provides an async interface for it.
527///
528/// [epoll]: https://en.wikipedia.org/wiki/Epoll
529/// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
530/// [event ports]: https://illumos.org/man/port_create
531/// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
532///
533/// # Caveats
534///
535/// [`Async`] is a low-level primitive, and as such it comes with some caveats.
536///
537/// For higher-level primitives built on top of [`Async`], look into [`async-net`] or
538/// [`async-process`] (on Unix).
539///
540/// The most notable caveat is that it is unsafe to access the inner I/O source mutably
541/// using this primitive. Traits likes [`AsyncRead`] and [`AsyncWrite`] are not implemented by
542/// default unless it is guaranteed that the resource won't be invalidated by reading or writing.
543/// See the [`IoSafe`] trait for more information.
544///
545/// [`async-net`]: https://github.com/smol-rs/async-net
546/// [`async-process`]: https://github.com/smol-rs/async-process
547/// [`AsyncRead`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncRead.html
548/// [`AsyncWrite`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncWrite.html
549///
550/// ### Supported types
551///
552/// [`Async`] supports all networking types, as well as some OS-specific file descriptors like
553/// [timerfd] and [inotify].
554///
555/// However, do not use [`Async`] with types like [`File`][`std::fs::File`],
556/// [`Stdin`][`std::io::Stdin`], [`Stdout`][`std::io::Stdout`], or [`Stderr`][`std::io::Stderr`]
557/// because all operating systems have issues with them when put in non-blocking mode.
558///
559/// [timerfd]: https://github.com/smol-rs/async-io/blob/master/examples/linux-timerfd.rs
560/// [inotify]: https://github.com/smol-rs/async-io/blob/master/examples/linux-inotify.rs
561///
562/// ### Concurrent I/O
563///
564/// Note that [`&Async<T>`][`Async`] implements [`AsyncRead`] and [`AsyncWrite`] if `&T`
565/// implements those traits, which means tasks can concurrently read and write using shared
566/// references.
567///
568/// But there is a catch: only one task can read a time, and only one task can write at a time. It
569/// is okay to have two tasks where one is reading and the other is writing at the same time, but
570/// it is not okay to have two tasks reading at the same time or writing at the same time. If you
571/// try to do that, conflicting tasks will just keep waking each other in turn, thus wasting CPU
572/// time.
573///
574/// Besides [`AsyncRead`] and [`AsyncWrite`], this caveat also applies to
575/// [`poll_readable()`][`Async::poll_readable()`] and
576/// [`poll_writable()`][`Async::poll_writable()`].
577///
578/// However, any number of tasks can be concurrently calling other methods like
579/// [`readable()`][`Async::readable()`] or [`read_with()`][`Async::read_with()`].
580///
581/// ### Closing
582///
583/// Closing the write side of [`Async`] with [`close()`][`futures_lite::AsyncWriteExt::close()`]
584/// simply flushes. If you want to shutdown a TCP or Unix socket, use
585/// [`Shutdown`][`std::net::Shutdown`].
586///
587/// # Examples
588///
589/// Connect to a server and echo incoming messages back to the server:
590///
591/// ```no_run
592/// use async_io::Async;
593/// use futures_lite::io;
594/// use std::net::TcpStream;
595///
596/// # futures_lite::future::block_on(async {
597/// // Connect to a local server.
598/// let stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 8000)).await?;
599///
600/// // Echo all messages from the read side of the stream into the write side.
601/// io::copy(&stream, &stream).await?;
602/// # std::io::Result::Ok(()) });
603/// ```
604///
605/// You can use either predefined async methods or wrap blocking I/O operations in
606/// [`Async::read_with()`], [`Async::read_with_mut()`], [`Async::write_with()`], and
607/// [`Async::write_with_mut()`]:
608///
609/// ```no_run
610/// use async_io::Async;
611/// use std::net::TcpListener;
612///
613/// # futures_lite::future::block_on(async {
614/// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
615///
616/// // These two lines are equivalent:
617/// let (stream, addr) = listener.accept().await?;
618/// let (stream, addr) = listener.read_with(|inner| inner.accept()).await?;
619/// # std::io::Result::Ok(()) });
620/// ```
621#[derive(Debug)]
622pub struct Async<T> {
623    /// A source registered in the reactor.
624    source: Arc<Source>,
625
626    /// The inner I/O handle.
627    io: Option<T>,
628}
629
630impl<T> Unpin for Async<T> {}
631
632#[cfg(unix)]
633impl<T: AsFd> Async<T> {
634    /// Creates an async I/O handle.
635    ///
636    /// This method will put the handle in non-blocking mode and register it in
637    /// [epoll]/[kqueue]/[event ports]/[IOCP].
638    ///
639    /// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
640    /// `AsSocket`.
641    ///
642    /// [epoll]: https://en.wikipedia.org/wiki/Epoll
643    /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
644    /// [event ports]: https://illumos.org/man/port_create
645    /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
646    ///
647    /// # Examples
648    ///
649    /// ```
650    /// use async_io::Async;
651    /// use std::net::{SocketAddr, TcpListener};
652    ///
653    /// # futures_lite::future::block_on(async {
654    /// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
655    /// let listener = Async::new(listener)?;
656    /// # std::io::Result::Ok(()) });
657    /// ```
658    pub fn new(io: T) -> io::Result<Async<T>> {
659        // Put the file descriptor in non-blocking mode.
660        set_nonblocking(io.as_fd())?;
661
662        Self::new_nonblocking(io)
663    }
664
665    /// Creates an async I/O handle without setting it to non-blocking mode.
666    ///
667    /// This method will register the handle in [epoll]/[kqueue]/[event ports]/[IOCP].
668    ///
669    /// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
670    /// `AsSocket`.
671    ///
672    /// [epoll]: https://en.wikipedia.org/wiki/Epoll
673    /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
674    /// [event ports]: https://illumos.org/man/port_create
675    /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
676    ///
677    /// # Caveats
678    ///
679    /// The caller should ensure that the handle is set to non-blocking mode or that it is okay if
680    /// it is not set. If not set to non-blocking mode, I/O operations may block the current thread
681    /// and cause a deadlock in an asynchronous context.
682    pub fn new_nonblocking(io: T) -> io::Result<Async<T>> {
683        // SAFETY: It is impossible to drop the I/O source while it is registered through
684        // this type.
685        let registration = unsafe { Registration::new(io.as_fd()) };
686
687        Ok(Async {
688            source: Reactor::get().insert_io(registration)?,
689            io: Some(io),
690        })
691    }
692}
693
694#[cfg(unix)]
695impl<T: AsRawFd> AsRawFd for Async<T> {
696    fn as_raw_fd(&self) -> RawFd {
697        self.get_ref().as_raw_fd()
698    }
699}
700
701#[cfg(unix)]
702impl<T: AsFd> AsFd for Async<T> {
703    fn as_fd(&self) -> BorrowedFd<'_> {
704        self.get_ref().as_fd()
705    }
706}
707
708#[cfg(unix)]
709impl<T: AsFd + From<OwnedFd>> TryFrom<OwnedFd> for Async<T> {
710    type Error = io::Error;
711
712    fn try_from(value: OwnedFd) -> Result<Self, Self::Error> {
713        Async::new(value.into())
714    }
715}
716
717#[cfg(unix)]
718impl<T: Into<OwnedFd>> TryFrom<Async<T>> for OwnedFd {
719    type Error = io::Error;
720
721    fn try_from(value: Async<T>) -> Result<Self, Self::Error> {
722        value.into_inner().map(Into::into)
723    }
724}
725
726#[cfg(windows)]
727impl<T: AsSocket> Async<T> {
728    /// Creates an async I/O handle.
729    ///
730    /// This method will put the handle in non-blocking mode and register it in
731    /// [epoll]/[kqueue]/[event ports]/[IOCP].
732    ///
733    /// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
734    /// `AsSocket`.
735    ///
736    /// [epoll]: https://en.wikipedia.org/wiki/Epoll
737    /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
738    /// [event ports]: https://illumos.org/man/port_create
739    /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
740    ///
741    /// # Examples
742    ///
743    /// ```
744    /// use async_io::Async;
745    /// use std::net::{SocketAddr, TcpListener};
746    ///
747    /// # futures_lite::future::block_on(async {
748    /// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
749    /// let listener = Async::new(listener)?;
750    /// # std::io::Result::Ok(()) });
751    /// ```
752    pub fn new(io: T) -> io::Result<Async<T>> {
753        // Put the socket in non-blocking mode.
754        set_nonblocking(io.as_socket())?;
755
756        Self::new_nonblocking(io)
757    }
758
759    /// Creates an async I/O handle without setting it to non-blocking mode.
760    ///
761    /// This method will register the handle in [epoll]/[kqueue]/[event ports]/[IOCP].
762    ///
763    /// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
764    /// `AsSocket`.
765    ///
766    /// [epoll]: https://en.wikipedia.org/wiki/Epoll
767    /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
768    /// [event ports]: https://illumos.org/man/port_create
769    /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
770    ///
771    /// # Caveats
772    ///
773    /// The caller should ensure that the handle is set to non-blocking mode or that it is okay if
774    /// it is not set. If not set to non-blocking mode, I/O operations may block the current thread
775    /// and cause a deadlock in an asynchronous context.
776    pub fn new_nonblocking(io: T) -> io::Result<Async<T>> {
777        // Create the registration.
778        //
779        // SAFETY: It is impossible to drop the I/O source while it is registered through
780        // this type.
781        let registration = unsafe { Registration::new(io.as_socket()) };
782
783        Ok(Async {
784            source: Reactor::get().insert_io(registration)?,
785            io: Some(io),
786        })
787    }
788}
789
790#[cfg(windows)]
791impl<T: AsRawSocket> AsRawSocket for Async<T> {
792    fn as_raw_socket(&self) -> RawSocket {
793        self.get_ref().as_raw_socket()
794    }
795}
796
797#[cfg(windows)]
798impl<T: AsSocket> AsSocket for Async<T> {
799    fn as_socket(&self) -> BorrowedSocket<'_> {
800        self.get_ref().as_socket()
801    }
802}
803
804#[cfg(windows)]
805impl<T: AsSocket + From<OwnedSocket>> TryFrom<OwnedSocket> for Async<T> {
806    type Error = io::Error;
807
808    fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> {
809        Async::new(value.into())
810    }
811}
812
813#[cfg(windows)]
814impl<T: Into<OwnedSocket>> TryFrom<Async<T>> for OwnedSocket {
815    type Error = io::Error;
816
817    fn try_from(value: Async<T>) -> Result<Self, Self::Error> {
818        value.into_inner().map(Into::into)
819    }
820}
821
822impl<T> Async<T> {
823    /// Gets a reference to the inner I/O handle.
824    ///
825    /// # Examples
826    ///
827    /// ```
828    /// use async_io::Async;
829    /// use std::net::TcpListener;
830    ///
831    /// # futures_lite::future::block_on(async {
832    /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
833    /// let inner = listener.get_ref();
834    /// # std::io::Result::Ok(()) });
835    /// ```
836    pub fn get_ref(&self) -> &T {
837        self.io.as_ref().unwrap()
838    }
839
840    /// Gets a mutable reference to the inner I/O handle.
841    ///
842    /// # Safety
843    ///
844    /// The underlying I/O source must not be dropped using this function.
845    ///
846    /// # Examples
847    ///
848    /// ```
849    /// use async_io::Async;
850    /// use std::net::TcpListener;
851    ///
852    /// # futures_lite::future::block_on(async {
853    /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
854    /// let inner = unsafe { listener.get_mut() };
855    /// # std::io::Result::Ok(()) });
856    /// ```
857    pub unsafe fn get_mut(&mut self) -> &mut T {
858        self.io.as_mut().unwrap()
859    }
860
861    /// Unwraps the inner I/O handle.
862    ///
863    /// This method will **not** put the I/O handle back into blocking mode.
864    ///
865    /// # Examples
866    ///
867    /// ```
868    /// use async_io::Async;
869    /// use std::net::TcpListener;
870    ///
871    /// # futures_lite::future::block_on(async {
872    /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
873    /// let inner = listener.into_inner()?;
874    ///
875    /// // Put the listener back into blocking mode.
876    /// inner.set_nonblocking(false)?;
877    /// # std::io::Result::Ok(()) });
878    /// ```
879    pub fn into_inner(mut self) -> io::Result<T> {
880        let io = self.io.take().unwrap();
881        Reactor::get().remove_io(&self.source)?;
882        Ok(io)
883    }
884
885    /// Waits until the I/O handle is readable.
886    ///
887    /// This method completes when a read operation on this I/O handle wouldn't block.
888    ///
889    /// # Examples
890    ///
891    /// ```no_run
892    /// use async_io::Async;
893    /// use std::net::TcpListener;
894    ///
895    /// # futures_lite::future::block_on(async {
896    /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
897    ///
898    /// // Wait until a client can be accepted.
899    /// listener.readable().await?;
900    /// # std::io::Result::Ok(()) });
901    /// ```
902    pub fn readable(&self) -> Readable<'_, T> {
903        Source::readable(self)
904    }
905
906    /// Waits until the I/O handle is readable.
907    ///
908    /// This method completes when a read operation on this I/O handle wouldn't block.
909    pub fn readable_owned(self: Arc<Self>) -> ReadableOwned<T> {
910        Source::readable_owned(self)
911    }
912
913    /// Waits until the I/O handle is writable.
914    ///
915    /// This method completes when a write operation on this I/O handle wouldn't block.
916    ///
917    /// # Examples
918    ///
919    /// ```
920    /// use async_io::Async;
921    /// use std::net::{TcpStream, ToSocketAddrs};
922    ///
923    /// # futures_lite::future::block_on(async {
924    /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
925    /// let stream = Async::<TcpStream>::connect(addr).await?;
926    ///
927    /// // Wait until the stream is writable.
928    /// stream.writable().await?;
929    /// # std::io::Result::Ok(()) });
930    /// ```
931    pub fn writable(&self) -> Writable<'_, T> {
932        Source::writable(self)
933    }
934
935    /// Waits until the I/O handle is writable.
936    ///
937    /// This method completes when a write operation on this I/O handle wouldn't block.
938    pub fn writable_owned(self: Arc<Self>) -> WritableOwned<T> {
939        Source::writable_owned(self)
940    }
941
942    /// Polls the I/O handle for readability.
943    ///
944    /// When this method returns [`Poll::Ready`], that means the OS has delivered an event
945    /// indicating readability since the last time this task has called the method and received
946    /// [`Poll::Pending`].
947    ///
948    /// # Caveats
949    ///
950    /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
951    /// will just keep waking each other in turn, thus wasting CPU time.
952    ///
953    /// Note that the [`AsyncRead`] implementation for [`Async`] also uses this method.
954    ///
955    /// # Examples
956    ///
957    /// ```no_run
958    /// use async_io::Async;
959    /// use futures_lite::future;
960    /// use std::net::TcpListener;
961    ///
962    /// # futures_lite::future::block_on(async {
963    /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
964    ///
965    /// // Wait until a client can be accepted.
966    /// future::poll_fn(|cx| listener.poll_readable(cx)).await?;
967    /// # std::io::Result::Ok(()) });
968    /// ```
969    pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
970        self.source.poll_readable(cx)
971    }
972
973    /// Polls the I/O handle for writability.
974    ///
975    /// When this method returns [`Poll::Ready`], that means the OS has delivered an event
976    /// indicating writability since the last time this task has called the method and received
977    /// [`Poll::Pending`].
978    ///
979    /// # Caveats
980    ///
981    /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
982    /// will just keep waking each other in turn, thus wasting CPU time.
983    ///
984    /// Note that the [`AsyncWrite`] implementation for [`Async`] also uses this method.
985    ///
986    /// # Examples
987    ///
988    /// ```
989    /// use async_io::Async;
990    /// use futures_lite::future;
991    /// use std::net::{TcpStream, ToSocketAddrs};
992    ///
993    /// # futures_lite::future::block_on(async {
994    /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
995    /// let stream = Async::<TcpStream>::connect(addr).await?;
996    ///
997    /// // Wait until the stream is writable.
998    /// future::poll_fn(|cx| stream.poll_writable(cx)).await?;
999    /// # std::io::Result::Ok(()) });
1000    /// ```
1001    pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1002        self.source.poll_writable(cx)
1003    }
1004
1005    /// Performs a read operation asynchronously.
1006    ///
1007    /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1008    /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1009    /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1010    /// sends a notification that the I/O handle is readable.
1011    ///
1012    /// The closure receives a shared reference to the I/O handle.
1013    ///
1014    /// # Examples
1015    ///
1016    /// ```no_run
1017    /// use async_io::Async;
1018    /// use std::net::TcpListener;
1019    ///
1020    /// # futures_lite::future::block_on(async {
1021    /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
1022    ///
1023    /// // Accept a new client asynchronously.
1024    /// let (stream, addr) = listener.read_with(|l| l.accept()).await?;
1025    /// # std::io::Result::Ok(()) });
1026    /// ```
1027    pub async fn read_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
1028        let mut op = op;
1029        loop {
1030            match op(self.get_ref()) {
1031                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1032                res => return res,
1033            }
1034            optimistic(self.readable()).await?;
1035        }
1036    }
1037
1038    /// Performs a read operation asynchronously.
1039    ///
1040    /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1041    /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1042    /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1043    /// sends a notification that the I/O handle is readable.
1044    ///
1045    /// The closure receives a mutable reference to the I/O handle.
1046    ///
1047    /// # Safety
1048    ///
1049    /// In the closure, the underlying I/O source must not be dropped.
1050    ///
1051    /// # Examples
1052    ///
1053    /// ```no_run
1054    /// use async_io::Async;
1055    /// use std::net::TcpListener;
1056    ///
1057    /// # futures_lite::future::block_on(async {
1058    /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
1059    ///
1060    /// // Accept a new client asynchronously.
1061    /// let (stream, addr) = unsafe { listener.read_with_mut(|l| l.accept()).await? };
1062    /// # std::io::Result::Ok(()) });
1063    /// ```
1064    pub async unsafe fn read_with_mut<R>(
1065        &mut self,
1066        op: impl FnMut(&mut T) -> io::Result<R>,
1067    ) -> io::Result<R> {
1068        let mut op = op;
1069        loop {
1070            match op(self.get_mut()) {
1071                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1072                res => return res,
1073            }
1074            optimistic(self.readable()).await?;
1075        }
1076    }
1077
1078    /// Performs a write operation asynchronously.
1079    ///
1080    /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1081    /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1082    /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1083    /// sends a notification that the I/O handle is writable.
1084    ///
1085    /// The closure receives a shared reference to the I/O handle.
1086    ///
1087    /// # Examples
1088    ///
1089    /// ```no_run
1090    /// use async_io::Async;
1091    /// use std::net::UdpSocket;
1092    ///
1093    /// # futures_lite::future::block_on(async {
1094    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1095    /// socket.get_ref().connect("127.0.0.1:9000")?;
1096    ///
1097    /// let msg = b"hello";
1098    /// let len = socket.write_with(|s| s.send(msg)).await?;
1099    /// # std::io::Result::Ok(()) });
1100    /// ```
1101    pub async fn write_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
1102        let mut op = op;
1103        loop {
1104            match op(self.get_ref()) {
1105                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1106                res => return res,
1107            }
1108            optimistic(self.writable()).await?;
1109        }
1110    }
1111
1112    /// Performs a write operation asynchronously.
1113    ///
1114    /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1115    /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1116    /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1117    /// sends a notification that the I/O handle is writable.
1118    ///
1119    /// # Safety
1120    ///
1121    /// The closure receives a mutable reference to the I/O handle. In the closure, the underlying
1122    /// I/O source must not be dropped.
1123    ///
1124    /// # Examples
1125    ///
1126    /// ```no_run
1127    /// use async_io::Async;
1128    /// use std::net::UdpSocket;
1129    ///
1130    /// # futures_lite::future::block_on(async {
1131    /// let mut socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1132    /// socket.get_ref().connect("127.0.0.1:9000")?;
1133    ///
1134    /// let msg = b"hello";
1135    /// let len = unsafe { socket.write_with_mut(|s| s.send(msg)).await? };
1136    /// # std::io::Result::Ok(()) });
1137    /// ```
1138    pub async unsafe fn write_with_mut<R>(
1139        &mut self,
1140        op: impl FnMut(&mut T) -> io::Result<R>,
1141    ) -> io::Result<R> {
1142        let mut op = op;
1143        loop {
1144            match op(self.get_mut()) {
1145                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1146                res => return res,
1147            }
1148            optimistic(self.writable()).await?;
1149        }
1150    }
1151}
1152
1153impl<T> AsRef<T> for Async<T> {
1154    fn as_ref(&self) -> &T {
1155        self.get_ref()
1156    }
1157}
1158
1159impl<T> Drop for Async<T> {
1160    fn drop(&mut self) {
1161        if self.io.is_some() {
1162            // Deregister and ignore errors because destructors should not panic.
1163            Reactor::get().remove_io(&self.source).ok();
1164
1165            // Drop the I/O handle to close it.
1166            self.io.take();
1167        }
1168    }
1169}
1170
1171/// Types whose I/O trait implementations do not drop the underlying I/O source.
1172///
1173/// The resource contained inside of the [`Async`] cannot be invalidated. This invalidation can
1174/// happen if the inner resource (the [`TcpStream`], [`UnixListener`] or other `T`) is moved out
1175/// and dropped before the [`Async`]. Because of this, functions that grant mutable access to
1176/// the inner type are unsafe, as there is no way to guarantee that the source won't be dropped
1177/// and a dangling handle won't be left behind.
1178///
1179/// Unfortunately this extends to implementations of [`Read`] and [`Write`]. Since methods on those
1180/// traits take `&mut`, there is no guarantee that the implementor of those traits won't move the
1181/// source out while the method is being run.
1182///
1183/// This trait is an antidote to this predicament. By implementing this trait, the user pledges
1184/// that using any I/O traits won't destroy the source. This way, [`Async`] can implement the
1185/// `async` version of these I/O traits, like [`AsyncRead`] and [`AsyncWrite`].
1186///
1187/// # Safety
1188///
1189/// Any I/O trait implementations for this type must not drop the underlying I/O source. Traits
1190/// affected by this trait include [`Read`], [`Write`], [`Seek`] and [`BufRead`].
1191///
1192/// This trait is implemented by default on top of `libstd` types. In addition, it is implemented
1193/// for immutable reference types, as it is impossible to invalidate any outstanding references
1194/// while holding an immutable reference, even with interior mutability. As Rust's current pinning
1195/// system relies on similar guarantees, I believe that this approach is robust.
1196///
1197/// [`BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html
1198/// [`Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
1199/// [`Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html
1200/// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
1201///
1202/// [`AsyncRead`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncRead.html
1203/// [`AsyncWrite`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncWrite.html
1204pub unsafe trait IoSafe {}
1205
1206/// Reference types can't be mutated.
1207///
1208/// The worst thing that can happen is that external state is used to change what kind of pointer
1209/// `as_fd()` returns. For instance:
1210///
1211/// ```
1212/// # #[cfg(unix)] {
1213/// use std::cell::Cell;
1214/// use std::net::TcpStream;
1215/// use std::os::unix::io::{AsFd, BorrowedFd};
1216///
1217/// struct Bar {
1218///     flag: Cell<bool>,
1219///     a: TcpStream,
1220///     b: TcpStream
1221/// }
1222///
1223/// impl AsFd for Bar {
1224///     fn as_fd(&self) -> BorrowedFd<'_> {
1225///         if self.flag.replace(!self.flag.get()) {
1226///             self.a.as_fd()
1227///         } else {
1228///             self.b.as_fd()
1229///         }
1230///     }
1231/// }
1232/// # }
1233/// ```
1234///
1235/// We solve this problem by only calling `as_fd()` once to get the original source. Implementations
1236/// like this are considered buggy (but not unsound) and are thus not really supported by `async-io`.
1237unsafe impl<T: ?Sized> IoSafe for &T {}
1238
1239// Can be implemented on top of libstd types.
1240unsafe impl IoSafe for std::fs::File {}
1241unsafe impl IoSafe for std::io::Stderr {}
1242unsafe impl IoSafe for std::io::Stdin {}
1243unsafe impl IoSafe for std::io::Stdout {}
1244unsafe impl IoSafe for std::io::StderrLock<'_> {}
1245unsafe impl IoSafe for std::io::StdinLock<'_> {}
1246unsafe impl IoSafe for std::io::StdoutLock<'_> {}
1247unsafe impl IoSafe for std::net::TcpStream {}
1248unsafe impl IoSafe for std::process::ChildStdin {}
1249unsafe impl IoSafe for std::process::ChildStdout {}
1250unsafe impl IoSafe for std::process::ChildStderr {}
1251
1252#[cfg(unix)]
1253unsafe impl IoSafe for std::os::unix::net::UnixStream {}
1254
1255unsafe impl<T: IoSafe + Read> IoSafe for std::io::BufReader<T> {}
1256unsafe impl<T: IoSafe + Write> IoSafe for std::io::BufWriter<T> {}
1257unsafe impl<T: IoSafe + Write> IoSafe for std::io::LineWriter<T> {}
1258unsafe impl<T: IoSafe + ?Sized> IoSafe for &mut T {}
1259unsafe impl<T: IoSafe + ?Sized> IoSafe for Box<T> {}
1260unsafe impl<T: Clone + IoSafe> IoSafe for std::borrow::Cow<'_, T> {}
1261
1262impl<T: IoSafe + Read> AsyncRead for Async<T> {
1263    fn poll_read(
1264        mut self: Pin<&mut Self>,
1265        cx: &mut Context<'_>,
1266        buf: &mut [u8],
1267    ) -> Poll<io::Result<usize>> {
1268        loop {
1269            match unsafe { (*self).get_mut() }.read(buf) {
1270                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1271                res => return Poll::Ready(res),
1272            }
1273            ready!(self.poll_readable(cx))?;
1274        }
1275    }
1276
1277    fn poll_read_vectored(
1278        mut self: Pin<&mut Self>,
1279        cx: &mut Context<'_>,
1280        bufs: &mut [IoSliceMut<'_>],
1281    ) -> Poll<io::Result<usize>> {
1282        loop {
1283            match unsafe { (*self).get_mut() }.read_vectored(bufs) {
1284                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1285                res => return Poll::Ready(res),
1286            }
1287            ready!(self.poll_readable(cx))?;
1288        }
1289    }
1290}
1291
1292// Since this is through a reference, we can't mutate the inner I/O source.
1293// Therefore this is safe!
1294impl<T> AsyncRead for &Async<T>
1295where
1296    for<'a> &'a T: Read,
1297{
1298    fn poll_read(
1299        self: Pin<&mut Self>,
1300        cx: &mut Context<'_>,
1301        buf: &mut [u8],
1302    ) -> Poll<io::Result<usize>> {
1303        loop {
1304            match (*self).get_ref().read(buf) {
1305                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1306                res => return Poll::Ready(res),
1307            }
1308            ready!(self.poll_readable(cx))?;
1309        }
1310    }
1311
1312    fn poll_read_vectored(
1313        self: Pin<&mut Self>,
1314        cx: &mut Context<'_>,
1315        bufs: &mut [IoSliceMut<'_>],
1316    ) -> Poll<io::Result<usize>> {
1317        loop {
1318            match (*self).get_ref().read_vectored(bufs) {
1319                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1320                res => return Poll::Ready(res),
1321            }
1322            ready!(self.poll_readable(cx))?;
1323        }
1324    }
1325}
1326
1327impl<T: IoSafe + Write> AsyncWrite for Async<T> {
1328    fn poll_write(
1329        mut self: Pin<&mut Self>,
1330        cx: &mut Context<'_>,
1331        buf: &[u8],
1332    ) -> Poll<io::Result<usize>> {
1333        loop {
1334            match unsafe { (*self).get_mut() }.write(buf) {
1335                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1336                res => return Poll::Ready(res),
1337            }
1338            ready!(self.poll_writable(cx))?;
1339        }
1340    }
1341
1342    fn poll_write_vectored(
1343        mut self: Pin<&mut Self>,
1344        cx: &mut Context<'_>,
1345        bufs: &[IoSlice<'_>],
1346    ) -> Poll<io::Result<usize>> {
1347        loop {
1348            match unsafe { (*self).get_mut() }.write_vectored(bufs) {
1349                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1350                res => return Poll::Ready(res),
1351            }
1352            ready!(self.poll_writable(cx))?;
1353        }
1354    }
1355
1356    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1357        loop {
1358            match unsafe { (*self).get_mut() }.flush() {
1359                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1360                res => return Poll::Ready(res),
1361            }
1362            ready!(self.poll_writable(cx))?;
1363        }
1364    }
1365
1366    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1367        self.poll_flush(cx)
1368    }
1369}
1370
1371impl<T> AsyncWrite for &Async<T>
1372where
1373    for<'a> &'a T: Write,
1374{
1375    fn poll_write(
1376        self: Pin<&mut Self>,
1377        cx: &mut Context<'_>,
1378        buf: &[u8],
1379    ) -> Poll<io::Result<usize>> {
1380        loop {
1381            match (*self).get_ref().write(buf) {
1382                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1383                res => return Poll::Ready(res),
1384            }
1385            ready!(self.poll_writable(cx))?;
1386        }
1387    }
1388
1389    fn poll_write_vectored(
1390        self: Pin<&mut Self>,
1391        cx: &mut Context<'_>,
1392        bufs: &[IoSlice<'_>],
1393    ) -> Poll<io::Result<usize>> {
1394        loop {
1395            match (*self).get_ref().write_vectored(bufs) {
1396                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1397                res => return Poll::Ready(res),
1398            }
1399            ready!(self.poll_writable(cx))?;
1400        }
1401    }
1402
1403    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1404        loop {
1405            match (*self).get_ref().flush() {
1406                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1407                res => return Poll::Ready(res),
1408            }
1409            ready!(self.poll_writable(cx))?;
1410        }
1411    }
1412
1413    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1414        self.poll_flush(cx)
1415    }
1416}
1417
1418impl Async<TcpListener> {
1419    /// Creates a TCP listener bound to the specified address.
1420    ///
1421    /// Binding with port number 0 will request an available port from the OS.
1422    ///
1423    /// # Examples
1424    ///
1425    /// ```
1426    /// use async_io::Async;
1427    /// use std::net::TcpListener;
1428    ///
1429    /// # futures_lite::future::block_on(async {
1430    /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
1431    /// println!("Listening on {}", listener.get_ref().local_addr()?);
1432    /// # std::io::Result::Ok(()) });
1433    /// ```
1434    pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpListener>> {
1435        let addr = addr.into();
1436        Async::new(TcpListener::bind(addr)?)
1437    }
1438
1439    /// Accepts a new incoming TCP connection.
1440    ///
1441    /// When a connection is established, it will be returned as a TCP stream together with its
1442    /// remote address.
1443    ///
1444    /// # Examples
1445    ///
1446    /// ```no_run
1447    /// use async_io::Async;
1448    /// use std::net::TcpListener;
1449    ///
1450    /// # futures_lite::future::block_on(async {
1451    /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
1452    /// let (stream, addr) = listener.accept().await?;
1453    /// println!("Accepted client: {}", addr);
1454    /// # std::io::Result::Ok(()) });
1455    /// ```
1456    pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> {
1457        let (stream, addr) = self.read_with(|io| io.accept()).await?;
1458        Ok((Async::new(stream)?, addr))
1459    }
1460
1461    /// Returns a stream of incoming TCP connections.
1462    ///
1463    /// The stream is infinite, i.e. it never stops with a [`None`].
1464    ///
1465    /// # Examples
1466    ///
1467    /// ```no_run
1468    /// use async_io::Async;
1469    /// use futures_lite::{pin, stream::StreamExt};
1470    /// use std::net::TcpListener;
1471    ///
1472    /// # futures_lite::future::block_on(async {
1473    /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
1474    /// let incoming = listener.incoming();
1475    /// pin!(incoming);
1476    ///
1477    /// while let Some(stream) = incoming.next().await {
1478    ///     let stream = stream?;
1479    ///     println!("Accepted client: {}", stream.get_ref().peer_addr()?);
1480    /// }
1481    /// # std::io::Result::Ok(()) });
1482    /// ```
1483    pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Send + '_ {
1484        stream::unfold(self, |listener| async move {
1485            let res = listener.accept().await.map(|(stream, _)| stream);
1486            Some((res, listener))
1487        })
1488    }
1489}
1490
1491impl TryFrom<std::net::TcpListener> for Async<std::net::TcpListener> {
1492    type Error = io::Error;
1493
1494    fn try_from(listener: std::net::TcpListener) -> io::Result<Self> {
1495        Async::new(listener)
1496    }
1497}
1498
1499impl Async<TcpStream> {
1500    /// Creates a TCP connection to the specified address.
1501    ///
1502    /// # Examples
1503    ///
1504    /// ```
1505    /// use async_io::Async;
1506    /// use std::net::{TcpStream, ToSocketAddrs};
1507    ///
1508    /// # futures_lite::future::block_on(async {
1509    /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
1510    /// let stream = Async::<TcpStream>::connect(addr).await?;
1511    /// # std::io::Result::Ok(()) });
1512    /// ```
1513    pub async fn connect<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpStream>> {
1514        // Figure out how to handle this address.
1515        let addr = addr.into();
1516        let (domain, sock_addr) = match addr {
1517            SocketAddr::V4(v4) => (rn::AddressFamily::INET, v4.as_any()),
1518            SocketAddr::V6(v6) => (rn::AddressFamily::INET6, v6.as_any()),
1519        };
1520
1521        // Begin async connect.
1522        let socket = connect(sock_addr, domain, Some(rn::ipproto::TCP))?;
1523        // Use new_nonblocking because connect already sets socket to non-blocking mode.
1524        let stream = Async::new_nonblocking(TcpStream::from(socket))?;
1525
1526        // The stream becomes writable when connected.
1527        stream.writable().await?;
1528
1529        // Check if there was an error while connecting.
1530        match stream.get_ref().take_error()? {
1531            None => Ok(stream),
1532            Some(err) => Err(err),
1533        }
1534    }
1535
1536    /// Reads data from the stream without removing it from the buffer.
1537    ///
1538    /// Returns the number of bytes read. Successive calls of this method read the same data.
1539    ///
1540    /// # Examples
1541    ///
1542    /// ```
1543    /// use async_io::Async;
1544    /// use futures_lite::{io::AsyncWriteExt, stream::StreamExt};
1545    /// use std::net::{TcpStream, ToSocketAddrs};
1546    ///
1547    /// # futures_lite::future::block_on(async {
1548    /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
1549    /// let mut stream = Async::<TcpStream>::connect(addr).await?;
1550    ///
1551    /// stream
1552    ///     .write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
1553    ///     .await?;
1554    ///
1555    /// let mut buf = [0u8; 1024];
1556    /// let len = stream.peek(&mut buf).await?;
1557    /// # std::io::Result::Ok(()) });
1558    /// ```
1559    pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1560        self.read_with(|io| io.peek(buf)).await
1561    }
1562}
1563
1564impl TryFrom<std::net::TcpStream> for Async<std::net::TcpStream> {
1565    type Error = io::Error;
1566
1567    fn try_from(stream: std::net::TcpStream) -> io::Result<Self> {
1568        Async::new(stream)
1569    }
1570}
1571
1572impl Async<UdpSocket> {
1573    /// Creates a UDP socket bound to the specified address.
1574    ///
1575    /// Binding with port number 0 will request an available port from the OS.
1576    ///
1577    /// # Examples
1578    ///
1579    /// ```
1580    /// use async_io::Async;
1581    /// use std::net::UdpSocket;
1582    ///
1583    /// # futures_lite::future::block_on(async {
1584    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
1585    /// println!("Bound to {}", socket.get_ref().local_addr()?);
1586    /// # std::io::Result::Ok(()) });
1587    /// ```
1588    pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<UdpSocket>> {
1589        let addr = addr.into();
1590        Async::new(UdpSocket::bind(addr)?)
1591    }
1592
1593    /// Receives a single datagram message.
1594    ///
1595    /// Returns the number of bytes read and the address the message came from.
1596    ///
1597    /// This method must be called with a valid byte slice of sufficient size to hold the message.
1598    /// If the message is too long to fit, excess bytes may get discarded.
1599    ///
1600    /// # Examples
1601    ///
1602    /// ```no_run
1603    /// use async_io::Async;
1604    /// use std::net::UdpSocket;
1605    ///
1606    /// # futures_lite::future::block_on(async {
1607    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1608    ///
1609    /// let mut buf = [0u8; 1024];
1610    /// let (len, addr) = socket.recv_from(&mut buf).await?;
1611    /// # std::io::Result::Ok(()) });
1612    /// ```
1613    pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1614        self.read_with(|io| io.recv_from(buf)).await
1615    }
1616
1617    /// Receives a single datagram message without removing it from the queue.
1618    ///
1619    /// Returns the number of bytes read and the address the message came from.
1620    ///
1621    /// This method must be called with a valid byte slice of sufficient size to hold the message.
1622    /// If the message is too long to fit, excess bytes may get discarded.
1623    ///
1624    /// # Examples
1625    ///
1626    /// ```no_run
1627    /// use async_io::Async;
1628    /// use std::net::UdpSocket;
1629    ///
1630    /// # futures_lite::future::block_on(async {
1631    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1632    ///
1633    /// let mut buf = [0u8; 1024];
1634    /// let (len, addr) = socket.peek_from(&mut buf).await?;
1635    /// # std::io::Result::Ok(()) });
1636    /// ```
1637    pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1638        self.read_with(|io| io.peek_from(buf)).await
1639    }
1640
1641    /// Sends data to the specified address.
1642    ///
1643    /// Returns the number of bytes written.
1644    ///
1645    /// # Examples
1646    ///
1647    /// ```no_run
1648    /// use async_io::Async;
1649    /// use std::net::UdpSocket;
1650    ///
1651    /// # futures_lite::future::block_on(async {
1652    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
1653    /// let addr = socket.get_ref().local_addr()?;
1654    ///
1655    /// let msg = b"hello";
1656    /// let len = socket.send_to(msg, addr).await?;
1657    /// # std::io::Result::Ok(()) });
1658    /// ```
1659    pub async fn send_to<A: Into<SocketAddr>>(&self, buf: &[u8], addr: A) -> io::Result<usize> {
1660        let addr = addr.into();
1661        self.write_with(|io| io.send_to(buf, addr)).await
1662    }
1663
1664    /// Receives a single datagram message from the connected peer.
1665    ///
1666    /// Returns the number of bytes read.
1667    ///
1668    /// This method must be called with a valid byte slice of sufficient size to hold the message.
1669    /// If the message is too long to fit, excess bytes may get discarded.
1670    ///
1671    /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1672    /// This method will fail if the socket is not connected.
1673    ///
1674    /// # Examples
1675    ///
1676    /// ```no_run
1677    /// use async_io::Async;
1678    /// use std::net::UdpSocket;
1679    ///
1680    /// # futures_lite::future::block_on(async {
1681    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1682    /// socket.get_ref().connect("127.0.0.1:9000")?;
1683    ///
1684    /// let mut buf = [0u8; 1024];
1685    /// let len = socket.recv(&mut buf).await?;
1686    /// # std::io::Result::Ok(()) });
1687    /// ```
1688    pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
1689        self.read_with(|io| io.recv(buf)).await
1690    }
1691
1692    /// Receives a single datagram message from the connected peer without removing it from the
1693    /// queue.
1694    ///
1695    /// Returns the number of bytes read and the address the message came from.
1696    ///
1697    /// This method must be called with a valid byte slice of sufficient size to hold the message.
1698    /// If the message is too long to fit, excess bytes may get discarded.
1699    ///
1700    /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1701    /// This method will fail if the socket is not connected.
1702    ///
1703    /// # Examples
1704    ///
1705    /// ```no_run
1706    /// use async_io::Async;
1707    /// use std::net::UdpSocket;
1708    ///
1709    /// # futures_lite::future::block_on(async {
1710    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1711    /// socket.get_ref().connect("127.0.0.1:9000")?;
1712    ///
1713    /// let mut buf = [0u8; 1024];
1714    /// let len = socket.peek(&mut buf).await?;
1715    /// # std::io::Result::Ok(()) });
1716    /// ```
1717    pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1718        self.read_with(|io| io.peek(buf)).await
1719    }
1720
1721    /// Sends data to the connected peer.
1722    ///
1723    /// Returns the number of bytes written.
1724    ///
1725    /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1726    /// This method will fail if the socket is not connected.
1727    ///
1728    /// # Examples
1729    ///
1730    /// ```no_run
1731    /// use async_io::Async;
1732    /// use std::net::UdpSocket;
1733    ///
1734    /// # futures_lite::future::block_on(async {
1735    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1736    /// socket.get_ref().connect("127.0.0.1:9000")?;
1737    ///
1738    /// let msg = b"hello";
1739    /// let len = socket.send(msg).await?;
1740    /// # std::io::Result::Ok(()) });
1741    /// ```
1742    pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
1743        self.write_with(|io| io.send(buf)).await
1744    }
1745}
1746
1747impl TryFrom<std::net::UdpSocket> for Async<std::net::UdpSocket> {
1748    type Error = io::Error;
1749
1750    fn try_from(socket: std::net::UdpSocket) -> io::Result<Self> {
1751        Async::new(socket)
1752    }
1753}
1754
1755#[cfg(unix)]
1756impl Async<UnixListener> {
1757    /// Creates a UDS listener bound to the specified path.
1758    ///
1759    /// # Examples
1760    ///
1761    /// ```no_run
1762    /// use async_io::Async;
1763    /// use std::os::unix::net::UnixListener;
1764    ///
1765    /// # futures_lite::future::block_on(async {
1766    /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1767    /// println!("Listening on {:?}", listener.get_ref().local_addr()?);
1768    /// # std::io::Result::Ok(()) });
1769    /// ```
1770    pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixListener>> {
1771        let path = path.as_ref().to_owned();
1772        Async::new(UnixListener::bind(path)?)
1773    }
1774
1775    /// Accepts a new incoming UDS stream connection.
1776    ///
1777    /// When a connection is established, it will be returned as a stream together with its remote
1778    /// address.
1779    ///
1780    /// # Examples
1781    ///
1782    /// ```no_run
1783    /// use async_io::Async;
1784    /// use std::os::unix::net::UnixListener;
1785    ///
1786    /// # futures_lite::future::block_on(async {
1787    /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1788    /// let (stream, addr) = listener.accept().await?;
1789    /// println!("Accepted client: {:?}", addr);
1790    /// # std::io::Result::Ok(()) });
1791    /// ```
1792    pub async fn accept(&self) -> io::Result<(Async<UnixStream>, UnixSocketAddr)> {
1793        let (stream, addr) = self.read_with(|io| io.accept()).await?;
1794        Ok((Async::new(stream)?, addr))
1795    }
1796
1797    /// Returns a stream of incoming UDS connections.
1798    ///
1799    /// The stream is infinite, i.e. it never stops with a [`None`] item.
1800    ///
1801    /// # Examples
1802    ///
1803    /// ```no_run
1804    /// use async_io::Async;
1805    /// use futures_lite::{pin, stream::StreamExt};
1806    /// use std::os::unix::net::UnixListener;
1807    ///
1808    /// # futures_lite::future::block_on(async {
1809    /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1810    /// let incoming = listener.incoming();
1811    /// pin!(incoming);
1812    ///
1813    /// while let Some(stream) = incoming.next().await {
1814    ///     let stream = stream?;
1815    ///     println!("Accepted client: {:?}", stream.get_ref().peer_addr()?);
1816    /// }
1817    /// # std::io::Result::Ok(()) });
1818    /// ```
1819    pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Send + '_ {
1820        stream::unfold(self, |listener| async move {
1821            let res = listener.accept().await.map(|(stream, _)| stream);
1822            Some((res, listener))
1823        })
1824    }
1825}
1826
1827#[cfg(unix)]
1828impl TryFrom<std::os::unix::net::UnixListener> for Async<std::os::unix::net::UnixListener> {
1829    type Error = io::Error;
1830
1831    fn try_from(listener: std::os::unix::net::UnixListener) -> io::Result<Self> {
1832        Async::new(listener)
1833    }
1834}
1835
1836#[cfg(unix)]
1837impl Async<UnixStream> {
1838    /// Creates a UDS stream connected to the specified path.
1839    ///
1840    /// # Examples
1841    ///
1842    /// ```no_run
1843    /// use async_io::Async;
1844    /// use std::os::unix::net::UnixStream;
1845    ///
1846    /// # futures_lite::future::block_on(async {
1847    /// let stream = Async::<UnixStream>::connect("/tmp/socket").await?;
1848    /// # std::io::Result::Ok(()) });
1849    /// ```
1850    pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixStream>> {
1851        let address = convert_path_to_socket_address(path.as_ref())?;
1852
1853        // Begin async connect.
1854        let socket = connect(address.into(), rn::AddressFamily::UNIX, None)?;
1855        // Use new_nonblocking because connect already sets socket to non-blocking mode.
1856        let stream = Async::new_nonblocking(UnixStream::from(socket))?;
1857
1858        // The stream becomes writable when connected.
1859        stream.writable().await?;
1860
1861        // On Linux, it appears the socket may become writable even when connecting fails, so we
1862        // must do an extra check here and see if the peer address is retrievable.
1863        stream.get_ref().peer_addr()?;
1864        Ok(stream)
1865    }
1866
1867    /// Creates an unnamed pair of connected UDS stream sockets.
1868    ///
1869    /// # Examples
1870    ///
1871    /// ```no_run
1872    /// use async_io::Async;
1873    /// use std::os::unix::net::UnixStream;
1874    ///
1875    /// # futures_lite::future::block_on(async {
1876    /// let (stream1, stream2) = Async::<UnixStream>::pair()?;
1877    /// # std::io::Result::Ok(()) });
1878    /// ```
1879    pub fn pair() -> io::Result<(Async<UnixStream>, Async<UnixStream>)> {
1880        let (stream1, stream2) = UnixStream::pair()?;
1881        Ok((Async::new(stream1)?, Async::new(stream2)?))
1882    }
1883}
1884
1885#[cfg(unix)]
1886impl TryFrom<std::os::unix::net::UnixStream> for Async<std::os::unix::net::UnixStream> {
1887    type Error = io::Error;
1888
1889    fn try_from(stream: std::os::unix::net::UnixStream) -> io::Result<Self> {
1890        Async::new(stream)
1891    }
1892}
1893
1894#[cfg(unix)]
1895impl Async<UnixDatagram> {
1896    /// Creates a UDS datagram socket bound to the specified path.
1897    ///
1898    /// # Examples
1899    ///
1900    /// ```no_run
1901    /// use async_io::Async;
1902    /// use std::os::unix::net::UnixDatagram;
1903    ///
1904    /// # futures_lite::future::block_on(async {
1905    /// let socket = Async::<UnixDatagram>::bind("/tmp/socket")?;
1906    /// # std::io::Result::Ok(()) });
1907    /// ```
1908    pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixDatagram>> {
1909        let path = path.as_ref().to_owned();
1910        Async::new(UnixDatagram::bind(path)?)
1911    }
1912
1913    /// Creates a UDS datagram socket not bound to any address.
1914    ///
1915    /// # Examples
1916    ///
1917    /// ```no_run
1918    /// use async_io::Async;
1919    /// use std::os::unix::net::UnixDatagram;
1920    ///
1921    /// # futures_lite::future::block_on(async {
1922    /// let socket = Async::<UnixDatagram>::unbound()?;
1923    /// # std::io::Result::Ok(()) });
1924    /// ```
1925    pub fn unbound() -> io::Result<Async<UnixDatagram>> {
1926        Async::new(UnixDatagram::unbound()?)
1927    }
1928
1929    /// Creates an unnamed pair of connected Unix datagram sockets.
1930    ///
1931    /// # Examples
1932    ///
1933    /// ```no_run
1934    /// use async_io::Async;
1935    /// use std::os::unix::net::UnixDatagram;
1936    ///
1937    /// # futures_lite::future::block_on(async {
1938    /// let (socket1, socket2) = Async::<UnixDatagram>::pair()?;
1939    /// # std::io::Result::Ok(()) });
1940    /// ```
1941    pub fn pair() -> io::Result<(Async<UnixDatagram>, Async<UnixDatagram>)> {
1942        let (socket1, socket2) = UnixDatagram::pair()?;
1943        Ok((Async::new(socket1)?, Async::new(socket2)?))
1944    }
1945
1946    /// Receives data from the socket.
1947    ///
1948    /// Returns the number of bytes read and the address the message came from.
1949    ///
1950    /// # Examples
1951    ///
1952    /// ```no_run
1953    /// use async_io::Async;
1954    /// use std::os::unix::net::UnixDatagram;
1955    ///
1956    /// # futures_lite::future::block_on(async {
1957    /// let socket = Async::<UnixDatagram>::bind("/tmp/socket")?;
1958    ///
1959    /// let mut buf = [0u8; 1024];
1960    /// let (len, addr) = socket.recv_from(&mut buf).await?;
1961    /// # std::io::Result::Ok(()) });
1962    /// ```
1963    pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> {
1964        self.read_with(|io| io.recv_from(buf)).await
1965    }
1966
1967    /// Sends data to the specified address.
1968    ///
1969    /// Returns the number of bytes written.
1970    ///
1971    /// # Examples
1972    ///
1973    /// ```no_run
1974    /// use async_io::Async;
1975    /// use std::os::unix::net::UnixDatagram;
1976    ///
1977    /// # futures_lite::future::block_on(async {
1978    /// let socket = Async::<UnixDatagram>::unbound()?;
1979    ///
1980    /// let msg = b"hello";
1981    /// let addr = "/tmp/socket";
1982    /// let len = socket.send_to(msg, addr).await?;
1983    /// # std::io::Result::Ok(()) });
1984    /// ```
1985    pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> {
1986        self.write_with(|io| io.send_to(buf, &path)).await
1987    }
1988
1989    /// Receives data from the connected peer.
1990    ///
1991    /// Returns the number of bytes read and the address the message came from.
1992    ///
1993    /// The [`connect`][`UnixDatagram::connect()`] method connects this socket to a remote address.
1994    /// This method will fail if the socket is not connected.
1995    ///
1996    /// # Examples
1997    ///
1998    /// ```no_run
1999    /// use async_io::Async;
2000    /// use std::os::unix::net::UnixDatagram;
2001    ///
2002    /// # futures_lite::future::block_on(async {
2003    /// let socket = Async::<UnixDatagram>::bind("/tmp/socket1")?;
2004    /// socket.get_ref().connect("/tmp/socket2")?;
2005    ///
2006    /// let mut buf = [0u8; 1024];
2007    /// let len = socket.recv(&mut buf).await?;
2008    /// # std::io::Result::Ok(()) });
2009    /// ```
2010    pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
2011        self.read_with(|io| io.recv(buf)).await
2012    }
2013
2014    /// Sends data to the connected peer.
2015    ///
2016    /// Returns the number of bytes written.
2017    ///
2018    /// The [`connect`][`UnixDatagram::connect()`] method connects this socket to a remote address.
2019    /// This method will fail if the socket is not connected.
2020    ///
2021    /// # Examples
2022    ///
2023    /// ```no_run
2024    /// use async_io::Async;
2025    /// use std::os::unix::net::UnixDatagram;
2026    ///
2027    /// # futures_lite::future::block_on(async {
2028    /// let socket = Async::<UnixDatagram>::bind("/tmp/socket1")?;
2029    /// socket.get_ref().connect("/tmp/socket2")?;
2030    ///
2031    /// let msg = b"hello";
2032    /// let len = socket.send(msg).await?;
2033    /// # std::io::Result::Ok(()) });
2034    /// ```
2035    pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
2036        self.write_with(|io| io.send(buf)).await
2037    }
2038}
2039
2040#[cfg(unix)]
2041impl TryFrom<std::os::unix::net::UnixDatagram> for Async<std::os::unix::net::UnixDatagram> {
2042    type Error = io::Error;
2043
2044    fn try_from(socket: std::os::unix::net::UnixDatagram) -> io::Result<Self> {
2045        Async::new(socket)
2046    }
2047}
2048
2049/// Polls a future once, waits for a wakeup, and then optimistically assumes the future is ready.
2050async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()> {
2051    let mut polled = false;
2052    pin!(fut);
2053
2054    future::poll_fn(|cx| {
2055        if !polled {
2056            polled = true;
2057            fut.as_mut().poll(cx)
2058        } else {
2059            Poll::Ready(Ok(()))
2060        }
2061    })
2062    .await
2063}
2064
2065fn connect(
2066    addr: rn::SocketAddrAny,
2067    domain: rn::AddressFamily,
2068    protocol: Option<rn::Protocol>,
2069) -> io::Result<rustix::fd::OwnedFd> {
2070    #[cfg(windows)]
2071    use rustix::fd::AsFd;
2072
2073    setup_networking();
2074
2075    #[cfg(any(
2076        target_os = "android",
2077        target_os = "dragonfly",
2078        target_os = "freebsd",
2079        target_os = "fuchsia",
2080        target_os = "illumos",
2081        target_os = "linux",
2082        target_os = "netbsd",
2083        target_os = "openbsd"
2084    ))]
2085    let socket = rn::socket_with(
2086        domain,
2087        rn::SocketType::STREAM,
2088        rn::SocketFlags::CLOEXEC | rn::SocketFlags::NONBLOCK,
2089        protocol,
2090    )?;
2091
2092    #[cfg(not(any(
2093        target_os = "android",
2094        target_os = "dragonfly",
2095        target_os = "freebsd",
2096        target_os = "fuchsia",
2097        target_os = "illumos",
2098        target_os = "linux",
2099        target_os = "netbsd",
2100        target_os = "openbsd"
2101    )))]
2102    let socket = {
2103        #[cfg(not(any(
2104            target_os = "aix",
2105            target_vendor = "apple",
2106            target_os = "espidf",
2107            windows,
2108        )))]
2109        let flags = rn::SocketFlags::CLOEXEC;
2110        #[cfg(any(
2111            target_os = "aix",
2112            target_vendor = "apple",
2113            target_os = "espidf",
2114            windows,
2115        ))]
2116        let flags = rn::SocketFlags::empty();
2117
2118        // Create the socket.
2119        let socket = rn::socket_with(domain, rn::SocketType::STREAM, flags, protocol)?;
2120
2121        // Set cloexec if necessary.
2122        #[cfg(any(target_os = "aix", target_vendor = "apple"))]
2123        rio::fcntl_setfd(&socket, rio::fcntl_getfd(&socket)? | rio::FdFlags::CLOEXEC)?;
2124
2125        // Set non-blocking mode.
2126        set_nonblocking(socket.as_fd())?;
2127
2128        socket
2129    };
2130
2131    // Set nosigpipe if necessary.
2132    #[cfg(any(
2133        target_vendor = "apple",
2134        target_os = "freebsd",
2135        target_os = "netbsd",
2136        target_os = "dragonfly",
2137    ))]
2138    rn::sockopt::set_socket_nosigpipe(&socket, true)?;
2139
2140    // Set the handle information to HANDLE_FLAG_INHERIT.
2141    #[cfg(windows)]
2142    unsafe {
2143        if windows_sys::Win32::Foundation::SetHandleInformation(
2144            socket.as_raw_socket() as _,
2145            windows_sys::Win32::Foundation::HANDLE_FLAG_INHERIT,
2146            windows_sys::Win32::Foundation::HANDLE_FLAG_INHERIT,
2147        ) == 0
2148        {
2149            return Err(io::Error::last_os_error());
2150        }
2151    }
2152
2153    #[allow(unreachable_patterns)]
2154    match rn::connect(&socket, &addr) {
2155        Ok(_) => {}
2156        #[cfg(unix)]
2157        Err(rio::Errno::INPROGRESS) => {}
2158        Err(rio::Errno::AGAIN) | Err(rio::Errno::WOULDBLOCK) => {}
2159        Err(err) => return Err(err.into()),
2160    }
2161    Ok(socket)
2162}
2163
2164#[inline]
2165fn setup_networking() {
2166    #[cfg(windows)]
2167    {
2168        // On Windows, we need to call WSAStartup before calling any networking code.
2169        // Make sure to call it at least once.
2170        static INIT: std::sync::Once = std::sync::Once::new();
2171
2172        INIT.call_once(|| {
2173            let _ = rustix::net::wsa_startup();
2174        });
2175    }
2176}
2177
2178#[inline]
2179fn set_nonblocking(
2180    #[cfg(unix)] fd: BorrowedFd<'_>,
2181    #[cfg(windows)] fd: BorrowedSocket<'_>,
2182) -> io::Result<()> {
2183    cfg_if::cfg_if! {
2184        // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
2185        // for now, as with the standard library, because it seems to behave
2186        // differently depending on the platform.
2187        // https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d
2188        // https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80
2189        // https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a
2190        if #[cfg(any(windows, target_os = "linux"))] {
2191            rustix::io::ioctl_fionbio(fd, true)?;
2192        } else {
2193            let previous = rustix::fs::fcntl_getfl(fd)?;
2194            let new = previous | rustix::fs::OFlags::NONBLOCK;
2195            if new != previous {
2196                rustix::fs::fcntl_setfl(fd, new)?;
2197            }
2198        }
2199    }
2200
2201    Ok(())
2202}
2203
2204/// Converts a `Path` to its socket address representation.
2205///
2206/// This function is abstract socket-aware.
2207#[cfg(unix)]
2208#[inline]
2209fn convert_path_to_socket_address(path: &Path) -> io::Result<rn::SocketAddrUnix> {
2210    // SocketAddrUnix::new() will throw EINVAL when a path with a zero in it is passed in.
2211    // However, some users expect to be able to pass in paths to abstract sockets, which
2212    // triggers this error as it has a zero in it. Therefore, if a path starts with a zero,
2213    // make it an abstract socket.
2214    #[cfg(any(target_os = "linux", target_os = "android"))]
2215    let address = {
2216        use std::os::unix::ffi::OsStrExt;
2217
2218        let path = path.as_os_str();
2219        match path.as_bytes().first() {
2220            Some(0) => rn::SocketAddrUnix::new_abstract_name(path.as_bytes().get(1..).unwrap())?,
2221            _ => rn::SocketAddrUnix::new(path)?,
2222        }
2223    };
2224
2225    // Only Linux and Android support abstract sockets.
2226    #[cfg(not(any(target_os = "linux", target_os = "android")))]
2227    let address = rn::SocketAddrUnix::new(path)?;
2228
2229    Ok(address)
2230}