async_io/lib.rs
1//! Async I/O and timers.
2//!
3//! This crate provides two tools:
4//!
5//! * [`Async`], an adapter for standard networking types (and [many other] types) to use in
6//! async programs.
7//! * [`Timer`], a future or stream that emits timed events.
8//!
9//! For concrete async networking types built on top of this crate, see [`async-net`].
10//!
11//! [many other]: https://github.com/smol-rs/async-io/tree/master/examples
12//! [`async-net`]: https://docs.rs/async-net
13//!
14//! # Implementation
15//!
16//! The first time [`Async`] or [`Timer`] is used, a thread named "async-io" will be spawned.
17//! The purpose of this thread is to wait for I/O events reported by the operating system, and then
18//! wake appropriate futures blocked on I/O or timers when they can be resumed.
19//!
20//! To wait for the next I/O event, the "async-io" thread uses [epoll] on Linux/Android/illumos,
21//! [kqueue] on macOS/iOS/BSD, [event ports] on illumos/Solaris, and [IOCP] on Windows. That
22//! functionality is provided by the [`polling`] crate.
23//!
24//! However, note that you can also process I/O events and wake futures on any thread using the
25//! [`block_on()`] function. The "async-io" thread is therefore just a fallback mechanism
26//! processing I/O events in case no other threads are.
27//!
28//! [epoll]: https://en.wikipedia.org/wiki/Epoll
29//! [kqueue]: https://en.wikipedia.org/wiki/Kqueue
30//! [event ports]: https://illumos.org/man/port_create
31//! [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
32//! [`polling`]: https://docs.rs/polling
33//!
34//! # Examples
35//!
36//! Connect to `example.com:80`, or time out after 10 seconds.
37//!
38//! ```
39//! use async_io::{Async, Timer};
40//! use futures_lite::{future::FutureExt, io};
41//!
42//! use std::net::{TcpStream, ToSocketAddrs};
43//! use std::time::Duration;
44//!
45//! # futures_lite::future::block_on(async {
46//! let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
47//!
48//! let stream = Async::<TcpStream>::connect(addr).or(async {
49//! Timer::after(Duration::from_secs(10)).await;
50//! Err(io::ErrorKind::TimedOut.into())
51//! })
52//! .await?;
53//! # std::io::Result::Ok(()) });
54//! ```
55
56#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
57#![doc(
58 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
59)]
60#![doc(
61 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
62)]
63
64use std::future::{poll_fn, Future};
65use std::io::{self, IoSlice, IoSliceMut, Read, Write};
66use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket};
67use std::pin::{pin, Pin};
68use std::sync::Arc;
69use std::task::{ready, Context, Poll, Waker};
70use std::time::{Duration, Instant};
71
72#[cfg(unix)]
73use std::{
74 os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd},
75 os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
76 path::Path,
77};
78
79#[cfg(windows)]
80use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket, RawSocket};
81
82use futures_io::{AsyncRead, AsyncWrite};
83use futures_lite::stream::{self, Stream};
84
85use rustix::io as rio;
86use rustix::net as rn;
87use rustix::net::addr::SocketAddrArg;
88
89use crate::reactor::{Reactor, Registration, Source};
90
91mod driver;
92mod reactor;
93
94pub mod os;
95
96pub use driver::block_on;
97pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned};
98
99/// A future or stream that emits timed events.
100///
101/// Timers are futures that output a single [`Instant`] when they fire.
102///
103/// Timers are also streams that can output [`Instant`]s periodically.
104///
105/// # Precision
106///
107/// There is a limit on the maximum precision that a `Timer` can provide. This limit is
108/// dependent on the current platform; for instance, on Windows, the maximum precision is
109/// about 16 milliseconds. Because of this limit, the timer may sleep for longer than the
110/// requested duration. It will never sleep for less.
111///
112/// # Examples
113///
114/// Sleep for 1 second:
115///
116/// ```
117/// use async_io::Timer;
118/// use std::time::Duration;
119///
120/// # futures_lite::future::block_on(async {
121/// Timer::after(Duration::from_secs(1)).await;
122/// # });
123/// ```
124///
125/// Timeout after 1 second:
126///
127/// ```
128/// use async_io::Timer;
129/// use futures_lite::FutureExt;
130/// use std::time::Duration;
131///
132/// # futures_lite::future::block_on(async {
133/// let addrs = async_net::resolve("google.com:80")
134/// .or(async {
135/// Timer::after(Duration::from_secs(1)).await;
136/// Err(std::io::ErrorKind::TimedOut.into())
137/// })
138/// .await?;
139/// # std::io::Result::Ok(()) });
140/// ```
141#[doc(alias = "sleep")]
142#[doc(alias = "timeout")]
143#[derive(Debug)]
144pub struct Timer {
145 /// This timer's ID and last waker that polled it.
146 ///
147 /// When this field is set to `None`, this timer is not registered in the reactor.
148 id_and_waker: Option<(usize, Waker)>,
149
150 /// The next instant at which this timer fires.
151 ///
152 /// If this timer is a blank timer, this value is None. If the timer
153 /// must be set, this value contains the next instant at which the
154 /// timer must fire.
155 when: Option<Instant>,
156
157 /// The period.
158 period: Duration,
159}
160
161impl Timer {
162 /// Creates a timer that will never fire.
163 ///
164 /// # Examples
165 ///
166 /// This function may also be useful for creating a function with an optional timeout.
167 ///
168 /// ```
169 /// # futures_lite::future::block_on(async {
170 /// use async_io::Timer;
171 /// use futures_lite::prelude::*;
172 /// use std::time::Duration;
173 ///
174 /// async fn run_with_timeout(timeout: Option<Duration>) {
175 /// let timer = timeout
176 /// .map(|timeout| Timer::after(timeout))
177 /// .unwrap_or_else(Timer::never);
178 ///
179 /// run_lengthy_operation().or(timer).await;
180 /// }
181 /// # // Note that since a Timer as a Future returns an Instant,
182 /// # // this function needs to return an Instant to be used
183 /// # // in "or".
184 /// # async fn run_lengthy_operation() -> std::time::Instant {
185 /// # std::time::Instant::now()
186 /// # }
187 ///
188 /// // Times out after 5 seconds.
189 /// run_with_timeout(Some(Duration::from_secs(5))).await;
190 /// // Does not time out.
191 /// run_with_timeout(None).await;
192 /// # });
193 /// ```
194 pub fn never() -> Timer {
195 Timer {
196 id_and_waker: None,
197 when: None,
198 period: Duration::MAX,
199 }
200 }
201
202 /// Creates a timer that emits an event once after the given duration of time.
203 ///
204 /// # Examples
205 ///
206 /// ```
207 /// use async_io::Timer;
208 /// use std::time::Duration;
209 ///
210 /// # futures_lite::future::block_on(async {
211 /// Timer::after(Duration::from_secs(1)).await;
212 /// # });
213 /// ```
214 pub fn after(duration: Duration) -> Timer {
215 Instant::now()
216 .checked_add(duration)
217 .map_or_else(Timer::never, Timer::at)
218 }
219
220 /// Creates a timer that emits an event once at the given time instant.
221 ///
222 /// # Examples
223 ///
224 /// ```
225 /// use async_io::Timer;
226 /// use std::time::{Duration, Instant};
227 ///
228 /// # futures_lite::future::block_on(async {
229 /// let now = Instant::now();
230 /// let when = now + Duration::from_secs(1);
231 /// Timer::at(when).await;
232 /// # });
233 /// ```
234 pub fn at(instant: Instant) -> Timer {
235 Timer::interval_at(instant, Duration::MAX)
236 }
237
238 /// Creates a timer that emits events periodically.
239 ///
240 /// # Examples
241 ///
242 /// ```
243 /// use async_io::Timer;
244 /// use futures_lite::StreamExt;
245 /// use std::time::{Duration, Instant};
246 ///
247 /// # futures_lite::future::block_on(async {
248 /// let period = Duration::from_secs(1);
249 /// Timer::interval(period).next().await;
250 /// # });
251 /// ```
252 pub fn interval(period: Duration) -> Timer {
253 Instant::now()
254 .checked_add(period)
255 .map_or_else(Timer::never, |at| Timer::interval_at(at, period))
256 }
257
258 /// Creates a timer that emits events periodically, starting at `start`.
259 ///
260 /// # Examples
261 ///
262 /// ```
263 /// use async_io::Timer;
264 /// use futures_lite::StreamExt;
265 /// use std::time::{Duration, Instant};
266 ///
267 /// # futures_lite::future::block_on(async {
268 /// let start = Instant::now();
269 /// let period = Duration::from_secs(1);
270 /// Timer::interval_at(start, period).next().await;
271 /// # });
272 /// ```
273 pub fn interval_at(start: Instant, period: Duration) -> Timer {
274 Timer {
275 id_and_waker: None,
276 when: Some(start),
277 period,
278 }
279 }
280
281 /// Indicates whether or not this timer will ever fire.
282 ///
283 /// [`never()`] will never fire, and timers created with [`after()`] or [`at()`] will fire
284 /// if the duration is not too large.
285 ///
286 /// [`never()`]: Timer::never()
287 /// [`after()`]: Timer::after()
288 /// [`at()`]: Timer::at()
289 ///
290 /// # Examples
291 ///
292 /// ```
293 /// # futures_lite::future::block_on(async {
294 /// use async_io::Timer;
295 /// use futures_lite::prelude::*;
296 /// use std::time::Duration;
297 ///
298 /// // `never` will never fire.
299 /// assert!(!Timer::never().will_fire());
300 ///
301 /// // `after` will fire if the duration is not too large.
302 /// assert!(Timer::after(Duration::from_secs(1)).will_fire());
303 /// assert!(!Timer::after(Duration::MAX).will_fire());
304 ///
305 /// // However, once an `after` timer has fired, it will never fire again.
306 /// let mut t = Timer::after(Duration::from_secs(1));
307 /// assert!(t.will_fire());
308 /// (&mut t).await;
309 /// assert!(!t.will_fire());
310 ///
311 /// // Interval timers will fire periodically.
312 /// let mut t = Timer::interval(Duration::from_secs(1));
313 /// assert!(t.will_fire());
314 /// t.next().await;
315 /// assert!(t.will_fire());
316 /// # });
317 /// ```
318 #[inline]
319 pub fn will_fire(&self) -> bool {
320 self.when.is_some()
321 }
322
323 /// Sets the timer to emit an event once after the given duration of time.
324 ///
325 /// Note that resetting a timer is different from creating a new timer because
326 /// [`set_after()`][`Timer::set_after()`] does not remove the waker associated with the task
327 /// that is polling the timer.
328 ///
329 /// # Examples
330 ///
331 /// ```
332 /// use async_io::Timer;
333 /// use std::time::Duration;
334 ///
335 /// # futures_lite::future::block_on(async {
336 /// let mut t = Timer::after(Duration::from_secs(1));
337 /// t.set_after(Duration::from_millis(100));
338 /// # });
339 /// ```
340 pub fn set_after(&mut self, duration: Duration) {
341 match Instant::now().checked_add(duration) {
342 Some(instant) => self.set_at(instant),
343 None => {
344 // Overflow to never going off.
345 self.clear();
346 }
347 }
348 }
349
350 /// Sets the timer to emit an event once at the given time instant.
351 ///
352 /// Note that resetting a timer is different from creating a new timer because
353 /// [`set_at()`][`Timer::set_at()`] does not remove the waker associated with the task
354 /// that is polling the timer.
355 ///
356 /// # Examples
357 ///
358 /// ```
359 /// use async_io::Timer;
360 /// use std::time::{Duration, Instant};
361 ///
362 /// # futures_lite::future::block_on(async {
363 /// let mut t = Timer::after(Duration::from_secs(1));
364 ///
365 /// let now = Instant::now();
366 /// let when = now + Duration::from_secs(1);
367 /// t.set_at(when);
368 /// # });
369 /// ```
370 pub fn set_at(&mut self, instant: Instant) {
371 self.clear();
372
373 // Update the timeout.
374 self.when = Some(instant);
375
376 if let Some((id, waker)) = self.id_and_waker.as_mut() {
377 // Re-register the timer with the new timeout.
378 *id = Reactor::get().insert_timer(instant, waker);
379 }
380 }
381
382 /// Sets the timer to emit events periodically.
383 ///
384 /// Note that resetting a timer is different from creating a new timer because
385 /// [`set_interval()`][`Timer::set_interval()`] does not remove the waker associated with the
386 /// task that is polling the timer.
387 ///
388 /// # Examples
389 ///
390 /// ```
391 /// use async_io::Timer;
392 /// use futures_lite::StreamExt;
393 /// use std::time::{Duration, Instant};
394 ///
395 /// # futures_lite::future::block_on(async {
396 /// let mut t = Timer::after(Duration::from_secs(1));
397 ///
398 /// let period = Duration::from_secs(2);
399 /// t.set_interval(period);
400 /// # });
401 /// ```
402 pub fn set_interval(&mut self, period: Duration) {
403 match Instant::now().checked_add(period) {
404 Some(instant) => self.set_interval_at(instant, period),
405 None => {
406 // Overflow to never going off.
407 self.clear();
408 }
409 }
410 }
411
412 /// Sets the timer to emit events periodically, starting at `start`.
413 ///
414 /// Note that resetting a timer is different from creating a new timer because
415 /// [`set_interval_at()`][`Timer::set_interval_at()`] does not remove the waker associated with
416 /// the task that is polling the timer.
417 ///
418 /// # Examples
419 ///
420 /// ```
421 /// use async_io::Timer;
422 /// use futures_lite::StreamExt;
423 /// use std::time::{Duration, Instant};
424 ///
425 /// # futures_lite::future::block_on(async {
426 /// let mut t = Timer::after(Duration::from_secs(1));
427 ///
428 /// let start = Instant::now();
429 /// let period = Duration::from_secs(2);
430 /// t.set_interval_at(start, period);
431 /// # });
432 /// ```
433 pub fn set_interval_at(&mut self, start: Instant, period: Duration) {
434 self.clear();
435
436 self.when = Some(start);
437 self.period = period;
438
439 if let Some((id, waker)) = self.id_and_waker.as_mut() {
440 // Re-register the timer with the new timeout.
441 *id = Reactor::get().insert_timer(start, waker);
442 }
443 }
444
445 /// Clear any timeouts set on this timer. It will never fire again until a new interval or instant is set.
446 pub fn clear(&mut self) {
447 if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.as_ref()) {
448 // Deregister the timer from the reactor.
449 Reactor::get().remove_timer(when, *id);
450 }
451 self.when = None;
452 }
453}
454
455impl Drop for Timer {
456 fn drop(&mut self) {
457 if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.take()) {
458 // Deregister the timer from the reactor.
459 Reactor::get().remove_timer(when, id);
460 }
461 }
462}
463
464impl Future for Timer {
465 type Output = Instant;
466
467 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
468 match self.poll_next(cx) {
469 Poll::Ready(Some(when)) => Poll::Ready(when),
470 Poll::Pending => Poll::Pending,
471 Poll::Ready(None) => unreachable!(),
472 }
473 }
474}
475
476impl Stream for Timer {
477 type Item = Instant;
478
479 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
480 let this = self.get_mut();
481
482 if let Some(ref mut when) = this.when {
483 // Check if the timer has already fired.
484 if Instant::now() >= *when {
485 if let Some((id, _)) = this.id_and_waker.take() {
486 // Deregister the timer from the reactor.
487 Reactor::get().remove_timer(*when, id);
488 }
489 let result_time = *when;
490 if let Some(next) = (*when).checked_add(this.period) {
491 *when = next;
492 // Register the timer in the reactor.
493 let id = Reactor::get().insert_timer(next, cx.waker());
494 this.id_and_waker = Some((id, cx.waker().clone()));
495 } else {
496 this.when = None;
497 }
498 return Poll::Ready(Some(result_time));
499 } else {
500 match &this.id_and_waker {
501 None => {
502 // Register the timer in the reactor.
503 let id = Reactor::get().insert_timer(*when, cx.waker());
504 this.id_and_waker = Some((id, cx.waker().clone()));
505 }
506 Some((id, w)) if !w.will_wake(cx.waker()) => {
507 // Deregister the timer from the reactor to remove the old waker.
508 Reactor::get().remove_timer(*when, *id);
509
510 // Register the timer in the reactor with the new waker.
511 let id = Reactor::get().insert_timer(*when, cx.waker());
512 this.id_and_waker = Some((id, cx.waker().clone()));
513 }
514 Some(_) => {}
515 }
516 }
517 }
518
519 Poll::Pending
520 }
521}
522
523/// Async adapter for I/O types.
524///
525/// This type puts an I/O handle into non-blocking mode, registers it in
526/// [epoll]/[kqueue]/[event ports]/[IOCP], and then provides an async interface for it.
527///
528/// [epoll]: https://en.wikipedia.org/wiki/Epoll
529/// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
530/// [event ports]: https://illumos.org/man/port_create
531/// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
532///
533/// # Caveats
534///
535/// [`Async`] is a low-level primitive, and as such it comes with some caveats.
536///
537/// For higher-level primitives built on top of [`Async`], look into [`async-net`] or
538/// [`async-process`] (on Unix).
539///
540/// The most notable caveat is that it is unsafe to access the inner I/O source mutably
541/// using this primitive. Traits likes [`AsyncRead`] and [`AsyncWrite`] are not implemented by
542/// default unless it is guaranteed that the resource won't be invalidated by reading or writing.
543/// See the [`IoSafe`] trait for more information.
544///
545/// [`async-net`]: https://github.com/smol-rs/async-net
546/// [`async-process`]: https://github.com/smol-rs/async-process
547/// [`AsyncRead`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncRead.html
548/// [`AsyncWrite`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncWrite.html
549///
550/// ### Supported types
551///
552/// [`Async`] supports all networking types, as well as some OS-specific file descriptors like
553/// [timerfd] and [inotify].
554///
555/// However, do not use [`Async`] with types like [`File`][`std::fs::File`],
556/// [`Stdin`][`std::io::Stdin`], [`Stdout`][`std::io::Stdout`], or [`Stderr`][`std::io::Stderr`]
557/// because all operating systems have issues with them when put in non-blocking mode.
558///
559/// [timerfd]: https://github.com/smol-rs/async-io/blob/master/examples/linux-timerfd.rs
560/// [inotify]: https://github.com/smol-rs/async-io/blob/master/examples/linux-inotify.rs
561///
562/// ### Concurrent I/O
563///
564/// Note that [`&Async<T>`][`Async`] implements [`AsyncRead`] and [`AsyncWrite`] if `&T`
565/// implements those traits, which means tasks can concurrently read and write using shared
566/// references.
567///
568/// But there is a catch: only one task can read a time, and only one task can write at a time. It
569/// is okay to have two tasks where one is reading and the other is writing at the same time, but
570/// it is not okay to have two tasks reading at the same time or writing at the same time. If you
571/// try to do that, conflicting tasks will just keep waking each other in turn, thus wasting CPU
572/// time.
573///
574/// Besides [`AsyncRead`] and [`AsyncWrite`], this caveat also applies to
575/// [`poll_readable()`][`Async::poll_readable()`] and
576/// [`poll_writable()`][`Async::poll_writable()`].
577///
578/// However, any number of tasks can be concurrently calling other methods like
579/// [`readable()`][`Async::readable()`] or [`read_with()`][`Async::read_with()`].
580///
581/// ### Closing
582///
583/// Closing the write side of [`Async`] with [`close()`][`futures_lite::AsyncWriteExt::close()`]
584/// simply flushes. If you want to shutdown a TCP or Unix socket, use
585/// [`Shutdown`][`std::net::Shutdown`].
586///
587/// # Examples
588///
589/// Connect to a server and echo incoming messages back to the server:
590///
591/// ```no_run
592/// use async_io::Async;
593/// use futures_lite::io;
594/// use std::net::TcpStream;
595///
596/// # futures_lite::future::block_on(async {
597/// // Connect to a local server.
598/// let stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 8000)).await?;
599///
600/// // Echo all messages from the read side of the stream into the write side.
601/// io::copy(&stream, &stream).await?;
602/// # std::io::Result::Ok(()) });
603/// ```
604///
605/// You can use either predefined async methods or wrap blocking I/O operations in
606/// [`Async::read_with()`], [`Async::read_with_mut()`], [`Async::write_with()`], and
607/// [`Async::write_with_mut()`]:
608///
609/// ```no_run
610/// use async_io::Async;
611/// use std::net::TcpListener;
612///
613/// # futures_lite::future::block_on(async {
614/// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
615///
616/// // These two lines are equivalent:
617/// let (stream, addr) = listener.accept().await?;
618/// let (stream, addr) = listener.read_with(|inner| inner.accept()).await?;
619/// # std::io::Result::Ok(()) });
620/// ```
621#[derive(Debug)]
622pub struct Async<T> {
623 /// A source registered in the reactor.
624 source: Arc<Source>,
625
626 /// The inner I/O handle.
627 io: Option<T>,
628}
629
630impl<T> Unpin for Async<T> {}
631
632#[cfg(unix)]
633impl<T: AsFd> Async<T> {
634 /// Creates an async I/O handle.
635 ///
636 /// This method will put the handle in non-blocking mode and register it in
637 /// [epoll]/[kqueue]/[event ports]/[IOCP].
638 ///
639 /// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
640 /// `AsSocket`.
641 ///
642 /// [epoll]: https://en.wikipedia.org/wiki/Epoll
643 /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
644 /// [event ports]: https://illumos.org/man/port_create
645 /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
646 ///
647 /// # Examples
648 ///
649 /// ```
650 /// use async_io::Async;
651 /// use std::net::{SocketAddr, TcpListener};
652 ///
653 /// # futures_lite::future::block_on(async {
654 /// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
655 /// let listener = Async::new(listener)?;
656 /// # std::io::Result::Ok(()) });
657 /// ```
658 pub fn new(io: T) -> io::Result<Async<T>> {
659 // Put the file descriptor in non-blocking mode.
660 set_nonblocking(io.as_fd())?;
661
662 Self::new_nonblocking(io)
663 }
664
665 /// Creates an async I/O handle without setting it to non-blocking mode.
666 ///
667 /// This method will register the handle in [epoll]/[kqueue]/[event ports]/[IOCP].
668 ///
669 /// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
670 /// `AsSocket`.
671 ///
672 /// [epoll]: https://en.wikipedia.org/wiki/Epoll
673 /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
674 /// [event ports]: https://illumos.org/man/port_create
675 /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
676 ///
677 /// # Caveats
678 ///
679 /// The caller should ensure that the handle is set to non-blocking mode or that it is okay if
680 /// it is not set. If not set to non-blocking mode, I/O operations may block the current thread
681 /// and cause a deadlock in an asynchronous context.
682 pub fn new_nonblocking(io: T) -> io::Result<Async<T>> {
683 // SAFETY: It is impossible to drop the I/O source while it is registered through
684 // this type.
685 let registration = unsafe { Registration::new(io.as_fd()) };
686
687 Ok(Async {
688 source: Reactor::get().insert_io(registration)?,
689 io: Some(io),
690 })
691 }
692}
693
694#[cfg(unix)]
695impl<T: AsRawFd> AsRawFd for Async<T> {
696 fn as_raw_fd(&self) -> RawFd {
697 self.get_ref().as_raw_fd()
698 }
699}
700
701#[cfg(unix)]
702impl<T: AsFd> AsFd for Async<T> {
703 fn as_fd(&self) -> BorrowedFd<'_> {
704 self.get_ref().as_fd()
705 }
706}
707
708#[cfg(unix)]
709impl<T: AsFd + From<OwnedFd>> TryFrom<OwnedFd> for Async<T> {
710 type Error = io::Error;
711
712 fn try_from(value: OwnedFd) -> Result<Self, Self::Error> {
713 Async::new(value.into())
714 }
715}
716
717#[cfg(unix)]
718impl<T: Into<OwnedFd>> TryFrom<Async<T>> for OwnedFd {
719 type Error = io::Error;
720
721 fn try_from(value: Async<T>) -> Result<Self, Self::Error> {
722 value.into_inner().map(Into::into)
723 }
724}
725
726#[cfg(windows)]
727impl<T: AsSocket> Async<T> {
728 /// Creates an async I/O handle.
729 ///
730 /// This method will put the handle in non-blocking mode and register it in
731 /// [epoll]/[kqueue]/[event ports]/[IOCP].
732 ///
733 /// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
734 /// `AsSocket`.
735 ///
736 /// [epoll]: https://en.wikipedia.org/wiki/Epoll
737 /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
738 /// [event ports]: https://illumos.org/man/port_create
739 /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
740 ///
741 /// # Examples
742 ///
743 /// ```
744 /// use async_io::Async;
745 /// use std::net::{SocketAddr, TcpListener};
746 ///
747 /// # futures_lite::future::block_on(async {
748 /// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
749 /// let listener = Async::new(listener)?;
750 /// # std::io::Result::Ok(()) });
751 /// ```
752 pub fn new(io: T) -> io::Result<Async<T>> {
753 // Put the socket in non-blocking mode.
754 set_nonblocking(io.as_socket())?;
755
756 Self::new_nonblocking(io)
757 }
758
759 /// Creates an async I/O handle without setting it to non-blocking mode.
760 ///
761 /// This method will register the handle in [epoll]/[kqueue]/[event ports]/[IOCP].
762 ///
763 /// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
764 /// `AsSocket`.
765 ///
766 /// [epoll]: https://en.wikipedia.org/wiki/Epoll
767 /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
768 /// [event ports]: https://illumos.org/man/port_create
769 /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
770 ///
771 /// # Caveats
772 ///
773 /// The caller should ensure that the handle is set to non-blocking mode or that it is okay if
774 /// it is not set. If not set to non-blocking mode, I/O operations may block the current thread
775 /// and cause a deadlock in an asynchronous context.
776 pub fn new_nonblocking(io: T) -> io::Result<Async<T>> {
777 // Create the registration.
778 //
779 // SAFETY: It is impossible to drop the I/O source while it is registered through
780 // this type.
781 let registration = unsafe { Registration::new(io.as_socket()) };
782
783 Ok(Async {
784 source: Reactor::get().insert_io(registration)?,
785 io: Some(io),
786 })
787 }
788}
789
790#[cfg(windows)]
791impl<T: AsRawSocket> AsRawSocket for Async<T> {
792 fn as_raw_socket(&self) -> RawSocket {
793 self.get_ref().as_raw_socket()
794 }
795}
796
797#[cfg(windows)]
798impl<T: AsSocket> AsSocket for Async<T> {
799 fn as_socket(&self) -> BorrowedSocket<'_> {
800 self.get_ref().as_socket()
801 }
802}
803
804#[cfg(windows)]
805impl<T: AsSocket + From<OwnedSocket>> TryFrom<OwnedSocket> for Async<T> {
806 type Error = io::Error;
807
808 fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> {
809 Async::new(value.into())
810 }
811}
812
813#[cfg(windows)]
814impl<T: Into<OwnedSocket>> TryFrom<Async<T>> for OwnedSocket {
815 type Error = io::Error;
816
817 fn try_from(value: Async<T>) -> Result<Self, Self::Error> {
818 value.into_inner().map(Into::into)
819 }
820}
821
822impl<T> Async<T> {
823 /// Gets a reference to the inner I/O handle.
824 ///
825 /// # Examples
826 ///
827 /// ```
828 /// use async_io::Async;
829 /// use std::net::TcpListener;
830 ///
831 /// # futures_lite::future::block_on(async {
832 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
833 /// let inner = listener.get_ref();
834 /// # std::io::Result::Ok(()) });
835 /// ```
836 pub fn get_ref(&self) -> &T {
837 self.io.as_ref().unwrap()
838 }
839
840 /// Gets a mutable reference to the inner I/O handle.
841 ///
842 /// # Safety
843 ///
844 /// The underlying I/O source must not be dropped using this function.
845 ///
846 /// # Examples
847 ///
848 /// ```
849 /// use async_io::Async;
850 /// use std::net::TcpListener;
851 ///
852 /// # futures_lite::future::block_on(async {
853 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
854 /// let inner = unsafe { listener.get_mut() };
855 /// # std::io::Result::Ok(()) });
856 /// ```
857 pub unsafe fn get_mut(&mut self) -> &mut T {
858 self.io.as_mut().unwrap()
859 }
860
861 /// Unwraps the inner I/O handle.
862 ///
863 /// This method will **not** put the I/O handle back into blocking mode.
864 ///
865 /// # Examples
866 ///
867 /// ```
868 /// use async_io::Async;
869 /// use std::net::TcpListener;
870 ///
871 /// # futures_lite::future::block_on(async {
872 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
873 /// let inner = listener.into_inner()?;
874 ///
875 /// // Put the listener back into blocking mode.
876 /// inner.set_nonblocking(false)?;
877 /// # std::io::Result::Ok(()) });
878 /// ```
879 pub fn into_inner(mut self) -> io::Result<T> {
880 let io = self.io.take().unwrap();
881 Reactor::get().remove_io(&self.source)?;
882 Ok(io)
883 }
884
885 /// Waits until the I/O handle is readable.
886 ///
887 /// This method completes when a read operation on this I/O handle wouldn't block.
888 ///
889 /// # Examples
890 ///
891 /// ```no_run
892 /// use async_io::Async;
893 /// use std::net::TcpListener;
894 ///
895 /// # futures_lite::future::block_on(async {
896 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
897 ///
898 /// // Wait until a client can be accepted.
899 /// listener.readable().await?;
900 /// # std::io::Result::Ok(()) });
901 /// ```
902 pub fn readable(&self) -> Readable<'_, T> {
903 Source::readable(self)
904 }
905
906 /// Waits until the I/O handle is readable.
907 ///
908 /// This method completes when a read operation on this I/O handle wouldn't block.
909 pub fn readable_owned(self: Arc<Self>) -> ReadableOwned<T> {
910 Source::readable_owned(self)
911 }
912
913 /// Waits until the I/O handle is writable.
914 ///
915 /// This method completes when a write operation on this I/O handle wouldn't block.
916 ///
917 /// # Examples
918 ///
919 /// ```
920 /// use async_io::Async;
921 /// use std::net::{TcpStream, ToSocketAddrs};
922 ///
923 /// # futures_lite::future::block_on(async {
924 /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
925 /// let stream = Async::<TcpStream>::connect(addr).await?;
926 ///
927 /// // Wait until the stream is writable.
928 /// stream.writable().await?;
929 /// # std::io::Result::Ok(()) });
930 /// ```
931 pub fn writable(&self) -> Writable<'_, T> {
932 Source::writable(self)
933 }
934
935 /// Waits until the I/O handle is writable.
936 ///
937 /// This method completes when a write operation on this I/O handle wouldn't block.
938 pub fn writable_owned(self: Arc<Self>) -> WritableOwned<T> {
939 Source::writable_owned(self)
940 }
941
942 /// Polls the I/O handle for readability.
943 ///
944 /// When this method returns [`Poll::Ready`], that means the OS has delivered an event
945 /// indicating readability since the last time this task has called the method and received
946 /// [`Poll::Pending`].
947 ///
948 /// # Caveats
949 ///
950 /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
951 /// will just keep waking each other in turn, thus wasting CPU time.
952 ///
953 /// Note that the [`AsyncRead`] implementation for [`Async`] also uses this method.
954 ///
955 /// # Examples
956 ///
957 /// ```no_run
958 /// use async_io::Async;
959 /// use std::future::poll_fn;
960 /// use std::net::TcpListener;
961 ///
962 /// # futures_lite::future::block_on(async {
963 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
964 ///
965 /// // Wait until a client can be accepted.
966 /// poll_fn(|cx| listener.poll_readable(cx)).await?;
967 /// # std::io::Result::Ok(()) });
968 /// ```
969 pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
970 self.source.poll_readable(cx)
971 }
972
973 /// Polls the I/O handle for writability.
974 ///
975 /// When this method returns [`Poll::Ready`], that means the OS has delivered an event
976 /// indicating writability since the last time this task has called the method and received
977 /// [`Poll::Pending`].
978 ///
979 /// # Caveats
980 ///
981 /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
982 /// will just keep waking each other in turn, thus wasting CPU time.
983 ///
984 /// Note that the [`AsyncWrite`] implementation for [`Async`] also uses this method.
985 ///
986 /// # Examples
987 ///
988 /// ```
989 /// use async_io::Async;
990 /// use std::future::poll_fn;
991 /// use std::net::{TcpStream, ToSocketAddrs};
992 ///
993 /// # futures_lite::future::block_on(async {
994 /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
995 /// let stream = Async::<TcpStream>::connect(addr).await?;
996 ///
997 /// // Wait until the stream is writable.
998 /// poll_fn(|cx| stream.poll_writable(cx)).await?;
999 /// # std::io::Result::Ok(()) });
1000 /// ```
1001 pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1002 self.source.poll_writable(cx)
1003 }
1004
1005 /// Performs a read operation asynchronously.
1006 ///
1007 /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1008 /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1009 /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1010 /// sends a notification that the I/O handle is readable.
1011 ///
1012 /// The closure receives a shared reference to the I/O handle.
1013 ///
1014 /// # Examples
1015 ///
1016 /// ```no_run
1017 /// use async_io::Async;
1018 /// use std::net::TcpListener;
1019 ///
1020 /// # futures_lite::future::block_on(async {
1021 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
1022 ///
1023 /// // Accept a new client asynchronously.
1024 /// let (stream, addr) = listener.read_with(|l| l.accept()).await?;
1025 /// # std::io::Result::Ok(()) });
1026 /// ```
1027 pub async fn read_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
1028 let mut op = op;
1029 loop {
1030 match op(self.get_ref()) {
1031 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1032 res => return res,
1033 }
1034 optimistic(self.readable()).await?;
1035 }
1036 }
1037
1038 /// Performs a read operation asynchronously.
1039 ///
1040 /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1041 /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1042 /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1043 /// sends a notification that the I/O handle is readable.
1044 ///
1045 /// The closure receives a mutable reference to the I/O handle.
1046 ///
1047 /// # Safety
1048 ///
1049 /// In the closure, the underlying I/O source must not be dropped.
1050 ///
1051 /// # Examples
1052 ///
1053 /// ```no_run
1054 /// use async_io::Async;
1055 /// use std::net::TcpListener;
1056 ///
1057 /// # futures_lite::future::block_on(async {
1058 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
1059 ///
1060 /// // Accept a new client asynchronously.
1061 /// let (stream, addr) = unsafe { listener.read_with_mut(|l| l.accept()).await? };
1062 /// # std::io::Result::Ok(()) });
1063 /// ```
1064 pub async unsafe fn read_with_mut<R>(
1065 &mut self,
1066 op: impl FnMut(&mut T) -> io::Result<R>,
1067 ) -> io::Result<R> {
1068 let mut op = op;
1069 loop {
1070 match op(self.get_mut()) {
1071 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1072 res => return res,
1073 }
1074 optimistic(self.readable()).await?;
1075 }
1076 }
1077
1078 /// Performs a write operation asynchronously.
1079 ///
1080 /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1081 /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1082 /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1083 /// sends a notification that the I/O handle is writable.
1084 ///
1085 /// The closure receives a shared reference to the I/O handle.
1086 ///
1087 /// # Examples
1088 ///
1089 /// ```no_run
1090 /// use async_io::Async;
1091 /// use std::net::UdpSocket;
1092 ///
1093 /// # futures_lite::future::block_on(async {
1094 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1095 /// socket.get_ref().connect("127.0.0.1:9000")?;
1096 ///
1097 /// let msg = b"hello";
1098 /// let len = socket.write_with(|s| s.send(msg)).await?;
1099 /// # std::io::Result::Ok(()) });
1100 /// ```
1101 pub async fn write_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
1102 let mut op = op;
1103 loop {
1104 match op(self.get_ref()) {
1105 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1106 res => return res,
1107 }
1108 optimistic(self.writable()).await?;
1109 }
1110 }
1111
1112 /// Performs a write operation asynchronously.
1113 ///
1114 /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1115 /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1116 /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1117 /// sends a notification that the I/O handle is writable.
1118 ///
1119 /// # Safety
1120 ///
1121 /// The closure receives a mutable reference to the I/O handle. In the closure, the underlying
1122 /// I/O source must not be dropped.
1123 ///
1124 /// # Examples
1125 ///
1126 /// ```no_run
1127 /// use async_io::Async;
1128 /// use std::net::UdpSocket;
1129 ///
1130 /// # futures_lite::future::block_on(async {
1131 /// let mut socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1132 /// socket.get_ref().connect("127.0.0.1:9000")?;
1133 ///
1134 /// let msg = b"hello";
1135 /// let len = unsafe { socket.write_with_mut(|s| s.send(msg)).await? };
1136 /// # std::io::Result::Ok(()) });
1137 /// ```
1138 pub async unsafe fn write_with_mut<R>(
1139 &mut self,
1140 op: impl FnMut(&mut T) -> io::Result<R>,
1141 ) -> io::Result<R> {
1142 let mut op = op;
1143 loop {
1144 match op(self.get_mut()) {
1145 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1146 res => return res,
1147 }
1148 optimistic(self.writable()).await?;
1149 }
1150 }
1151}
1152
1153impl<T> AsRef<T> for Async<T> {
1154 fn as_ref(&self) -> &T {
1155 self.get_ref()
1156 }
1157}
1158
1159impl<T> Drop for Async<T> {
1160 fn drop(&mut self) {
1161 if self.io.is_some() {
1162 // Deregister and ignore errors because destructors should not panic.
1163 Reactor::get().remove_io(&self.source).ok();
1164
1165 // Drop the I/O handle to close it.
1166 self.io.take();
1167 }
1168 }
1169}
1170
1171/// Types whose I/O trait implementations do not drop the underlying I/O source.
1172///
1173/// The resource contained inside of the [`Async`] cannot be invalidated. This invalidation can
1174/// happen if the inner resource (the [`TcpStream`], [`UnixListener`] or other `T`) is moved out
1175/// and dropped before the [`Async`]. Because of this, functions that grant mutable access to
1176/// the inner type are unsafe, as there is no way to guarantee that the source won't be dropped
1177/// and a dangling handle won't be left behind.
1178///
1179/// Unfortunately this extends to implementations of [`Read`] and [`Write`]. Since methods on those
1180/// traits take `&mut`, there is no guarantee that the implementor of those traits won't move the
1181/// source out while the method is being run.
1182///
1183/// This trait is an antidote to this predicament. By implementing this trait, the user pledges
1184/// that using any I/O traits won't destroy the source. This way, [`Async`] can implement the
1185/// `async` version of these I/O traits, like [`AsyncRead`] and [`AsyncWrite`].
1186///
1187/// # Safety
1188///
1189/// Any I/O trait implementations for this type must not drop the underlying I/O source. Traits
1190/// affected by this trait include [`Read`], [`Write`], [`Seek`] and [`BufRead`].
1191///
1192/// This trait is implemented by default on top of `libstd` types. In addition, it is implemented
1193/// for immutable reference types, as it is impossible to invalidate any outstanding references
1194/// while holding an immutable reference, even with interior mutability. As Rust's current pinning
1195/// system relies on similar guarantees, I believe that this approach is robust.
1196///
1197/// [`BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html
1198/// [`Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
1199/// [`Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html
1200/// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
1201///
1202/// [`AsyncRead`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncRead.html
1203/// [`AsyncWrite`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncWrite.html
1204pub unsafe trait IoSafe {}
1205
1206/// Reference types can't be mutated.
1207///
1208/// The worst thing that can happen is that external state is used to change what kind of pointer
1209/// `as_fd()` returns. For instance:
1210///
1211/// ```
1212/// # #[cfg(unix)] {
1213/// use std::cell::Cell;
1214/// use std::net::TcpStream;
1215/// use std::os::unix::io::{AsFd, BorrowedFd};
1216///
1217/// struct Bar {
1218/// flag: Cell<bool>,
1219/// a: TcpStream,
1220/// b: TcpStream
1221/// }
1222///
1223/// impl AsFd for Bar {
1224/// fn as_fd(&self) -> BorrowedFd<'_> {
1225/// if self.flag.replace(!self.flag.get()) {
1226/// self.a.as_fd()
1227/// } else {
1228/// self.b.as_fd()
1229/// }
1230/// }
1231/// }
1232/// # }
1233/// ```
1234///
1235/// We solve this problem by only calling `as_fd()` once to get the original source. Implementations
1236/// like this are considered buggy (but not unsound) and are thus not really supported by `async-io`.
1237unsafe impl<T: ?Sized> IoSafe for &T {}
1238
1239// Can be implemented on top of libstd types.
1240unsafe impl IoSafe for std::fs::File {}
1241unsafe impl IoSafe for std::io::Stderr {}
1242unsafe impl IoSafe for std::io::Stdin {}
1243unsafe impl IoSafe for std::io::Stdout {}
1244unsafe impl IoSafe for std::io::StderrLock<'_> {}
1245unsafe impl IoSafe for std::io::StdinLock<'_> {}
1246unsafe impl IoSafe for std::io::StdoutLock<'_> {}
1247unsafe impl IoSafe for std::net::TcpStream {}
1248unsafe impl IoSafe for std::process::ChildStdin {}
1249unsafe impl IoSafe for std::process::ChildStdout {}
1250unsafe impl IoSafe for std::process::ChildStderr {}
1251
1252#[cfg(unix)]
1253unsafe impl IoSafe for std::os::unix::net::UnixStream {}
1254
1255// PipeReader & PipeWriter require std >= 1.87, our MSRV is 1.71, hence
1256// conditional on cfg()s, generated from build.rs
1257#[cfg(not(async_io_no_pipe))]
1258unsafe impl IoSafe for std::io::PipeReader {}
1259#[cfg(not(async_io_no_pipe))]
1260unsafe impl IoSafe for std::io::PipeWriter {}
1261
1262unsafe impl<T: IoSafe + Read> IoSafe for std::io::BufReader<T> {}
1263unsafe impl<T: IoSafe + Write> IoSafe for std::io::BufWriter<T> {}
1264unsafe impl<T: IoSafe + Write> IoSafe for std::io::LineWriter<T> {}
1265unsafe impl<T: IoSafe + ?Sized> IoSafe for &mut T {}
1266unsafe impl<T: IoSafe + ?Sized> IoSafe for Box<T> {}
1267unsafe impl<T: Clone + IoSafe> IoSafe for std::borrow::Cow<'_, T> {}
1268
1269impl<T: IoSafe + Read> AsyncRead for Async<T> {
1270 fn poll_read(
1271 mut self: Pin<&mut Self>,
1272 cx: &mut Context<'_>,
1273 buf: &mut [u8],
1274 ) -> Poll<io::Result<usize>> {
1275 loop {
1276 match unsafe { (*self).get_mut() }.read(buf) {
1277 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1278 res => return Poll::Ready(res),
1279 }
1280 ready!(self.poll_readable(cx))?;
1281 }
1282 }
1283
1284 fn poll_read_vectored(
1285 mut self: Pin<&mut Self>,
1286 cx: &mut Context<'_>,
1287 bufs: &mut [IoSliceMut<'_>],
1288 ) -> Poll<io::Result<usize>> {
1289 loop {
1290 match unsafe { (*self).get_mut() }.read_vectored(bufs) {
1291 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1292 res => return Poll::Ready(res),
1293 }
1294 ready!(self.poll_readable(cx))?;
1295 }
1296 }
1297}
1298
1299// Since this is through a reference, we can't mutate the inner I/O source.
1300// Therefore this is safe!
1301impl<T> AsyncRead for &Async<T>
1302where
1303 for<'a> &'a T: Read,
1304{
1305 fn poll_read(
1306 self: Pin<&mut Self>,
1307 cx: &mut Context<'_>,
1308 buf: &mut [u8],
1309 ) -> Poll<io::Result<usize>> {
1310 loop {
1311 match (*self).get_ref().read(buf) {
1312 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1313 res => return Poll::Ready(res),
1314 }
1315 ready!(self.poll_readable(cx))?;
1316 }
1317 }
1318
1319 fn poll_read_vectored(
1320 self: Pin<&mut Self>,
1321 cx: &mut Context<'_>,
1322 bufs: &mut [IoSliceMut<'_>],
1323 ) -> Poll<io::Result<usize>> {
1324 loop {
1325 match (*self).get_ref().read_vectored(bufs) {
1326 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1327 res => return Poll::Ready(res),
1328 }
1329 ready!(self.poll_readable(cx))?;
1330 }
1331 }
1332}
1333
1334impl<T: IoSafe + Write> AsyncWrite for Async<T> {
1335 fn poll_write(
1336 mut self: Pin<&mut Self>,
1337 cx: &mut Context<'_>,
1338 buf: &[u8],
1339 ) -> Poll<io::Result<usize>> {
1340 loop {
1341 match unsafe { (*self).get_mut() }.write(buf) {
1342 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1343 res => return Poll::Ready(res),
1344 }
1345 ready!(self.poll_writable(cx))?;
1346 }
1347 }
1348
1349 fn poll_write_vectored(
1350 mut self: Pin<&mut Self>,
1351 cx: &mut Context<'_>,
1352 bufs: &[IoSlice<'_>],
1353 ) -> Poll<io::Result<usize>> {
1354 loop {
1355 match unsafe { (*self).get_mut() }.write_vectored(bufs) {
1356 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1357 res => return Poll::Ready(res),
1358 }
1359 ready!(self.poll_writable(cx))?;
1360 }
1361 }
1362
1363 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1364 loop {
1365 match unsafe { (*self).get_mut() }.flush() {
1366 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1367 res => return Poll::Ready(res),
1368 }
1369 ready!(self.poll_writable(cx))?;
1370 }
1371 }
1372
1373 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1374 self.poll_flush(cx)
1375 }
1376}
1377
1378impl<T> AsyncWrite for &Async<T>
1379where
1380 for<'a> &'a T: Write,
1381{
1382 fn poll_write(
1383 self: Pin<&mut Self>,
1384 cx: &mut Context<'_>,
1385 buf: &[u8],
1386 ) -> Poll<io::Result<usize>> {
1387 loop {
1388 match (*self).get_ref().write(buf) {
1389 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1390 res => return Poll::Ready(res),
1391 }
1392 ready!(self.poll_writable(cx))?;
1393 }
1394 }
1395
1396 fn poll_write_vectored(
1397 self: Pin<&mut Self>,
1398 cx: &mut Context<'_>,
1399 bufs: &[IoSlice<'_>],
1400 ) -> Poll<io::Result<usize>> {
1401 loop {
1402 match (*self).get_ref().write_vectored(bufs) {
1403 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1404 res => return Poll::Ready(res),
1405 }
1406 ready!(self.poll_writable(cx))?;
1407 }
1408 }
1409
1410 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1411 loop {
1412 match (*self).get_ref().flush() {
1413 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1414 res => return Poll::Ready(res),
1415 }
1416 ready!(self.poll_writable(cx))?;
1417 }
1418 }
1419
1420 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1421 self.poll_flush(cx)
1422 }
1423}
1424
1425impl Async<TcpListener> {
1426 /// Creates a TCP listener bound to the specified address.
1427 ///
1428 /// Binding with port number 0 will request an available port from the OS.
1429 ///
1430 /// # Examples
1431 ///
1432 /// ```
1433 /// use async_io::Async;
1434 /// use std::net::TcpListener;
1435 ///
1436 /// # futures_lite::future::block_on(async {
1437 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
1438 /// println!("Listening on {}", listener.get_ref().local_addr()?);
1439 /// # std::io::Result::Ok(()) });
1440 /// ```
1441 pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpListener>> {
1442 let addr = addr.into();
1443 Async::new(TcpListener::bind(addr)?)
1444 }
1445
1446 /// Accepts a new incoming TCP connection.
1447 ///
1448 /// When a connection is established, it will be returned as a TCP stream together with its
1449 /// remote address.
1450 ///
1451 /// # Examples
1452 ///
1453 /// ```no_run
1454 /// use async_io::Async;
1455 /// use std::net::TcpListener;
1456 ///
1457 /// # futures_lite::future::block_on(async {
1458 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
1459 /// let (stream, addr) = listener.accept().await?;
1460 /// println!("Accepted client: {}", addr);
1461 /// # std::io::Result::Ok(()) });
1462 /// ```
1463 pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> {
1464 let (stream, addr) = self.read_with(|io| io.accept()).await?;
1465 Ok((Async::new(stream)?, addr))
1466 }
1467
1468 /// Returns a stream of incoming TCP connections.
1469 ///
1470 /// The stream is infinite, i.e. it never stops with a [`None`].
1471 ///
1472 /// # Examples
1473 ///
1474 /// ```no_run
1475 /// use async_io::Async;
1476 /// use futures_lite::{stream::StreamExt};
1477 /// use std::net::TcpListener;
1478 /// use std::pin::pin;
1479 ///
1480 /// # futures_lite::future::block_on(async {
1481 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
1482 /// let incoming = listener.incoming();
1483 /// let mut incoming = pin!(incoming);
1484 ///
1485 /// while let Some(stream) = incoming.next().await {
1486 /// let stream = stream?;
1487 /// println!("Accepted client: {}", stream.get_ref().peer_addr()?);
1488 /// }
1489 /// # std::io::Result::Ok(()) });
1490 /// ```
1491 pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Send + '_ {
1492 stream::unfold(self, |listener| async move {
1493 let res = listener.accept().await.map(|(stream, _)| stream);
1494 Some((res, listener))
1495 })
1496 }
1497}
1498
1499impl TryFrom<std::net::TcpListener> for Async<std::net::TcpListener> {
1500 type Error = io::Error;
1501
1502 fn try_from(listener: std::net::TcpListener) -> io::Result<Self> {
1503 Async::new(listener)
1504 }
1505}
1506
1507impl Async<TcpStream> {
1508 /// Creates a TCP connection to the specified address.
1509 ///
1510 /// # Examples
1511 ///
1512 /// ```
1513 /// use async_io::Async;
1514 /// use std::net::{TcpStream, ToSocketAddrs};
1515 ///
1516 /// # futures_lite::future::block_on(async {
1517 /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
1518 /// let stream = Async::<TcpStream>::connect(addr).await?;
1519 /// # std::io::Result::Ok(()) });
1520 /// ```
1521 pub async fn connect<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpStream>> {
1522 // Figure out how to handle this address.
1523 let addr = addr.into();
1524 let (domain, sock_addr) = match addr {
1525 SocketAddr::V4(v4) => (rn::AddressFamily::INET, v4.as_any()),
1526 SocketAddr::V6(v6) => (rn::AddressFamily::INET6, v6.as_any()),
1527 };
1528
1529 // Begin async connect.
1530 let socket = connect(sock_addr, domain, Some(rn::ipproto::TCP))?;
1531 // Use new_nonblocking because connect already sets socket to non-blocking mode.
1532 let stream = Async::new_nonblocking(TcpStream::from(socket))?;
1533
1534 // The stream becomes writable when connected.
1535 stream.writable().await?;
1536
1537 // Check if there was an error while connecting.
1538 match stream.get_ref().take_error()? {
1539 None => Ok(stream),
1540 Some(err) => Err(err),
1541 }
1542 }
1543
1544 /// Reads data from the stream without removing it from the buffer.
1545 ///
1546 /// Returns the number of bytes read. Successive calls of this method read the same data.
1547 ///
1548 /// # Examples
1549 ///
1550 /// ```
1551 /// use async_io::Async;
1552 /// use futures_lite::{io::AsyncWriteExt, stream::StreamExt};
1553 /// use std::net::{TcpStream, ToSocketAddrs};
1554 ///
1555 /// # futures_lite::future::block_on(async {
1556 /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
1557 /// let mut stream = Async::<TcpStream>::connect(addr).await?;
1558 ///
1559 /// stream
1560 /// .write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
1561 /// .await?;
1562 ///
1563 /// let mut buf = [0u8; 1024];
1564 /// let len = stream.peek(&mut buf).await?;
1565 /// # std::io::Result::Ok(()) });
1566 /// ```
1567 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1568 self.read_with(|io| io.peek(buf)).await
1569 }
1570}
1571
1572impl TryFrom<std::net::TcpStream> for Async<std::net::TcpStream> {
1573 type Error = io::Error;
1574
1575 fn try_from(stream: std::net::TcpStream) -> io::Result<Self> {
1576 Async::new(stream)
1577 }
1578}
1579
1580impl Async<UdpSocket> {
1581 /// Creates a UDP socket bound to the specified address.
1582 ///
1583 /// Binding with port number 0 will request an available port from the OS.
1584 ///
1585 /// # Examples
1586 ///
1587 /// ```
1588 /// use async_io::Async;
1589 /// use std::net::UdpSocket;
1590 ///
1591 /// # futures_lite::future::block_on(async {
1592 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
1593 /// println!("Bound to {}", socket.get_ref().local_addr()?);
1594 /// # std::io::Result::Ok(()) });
1595 /// ```
1596 pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<UdpSocket>> {
1597 let addr = addr.into();
1598 Async::new(UdpSocket::bind(addr)?)
1599 }
1600
1601 /// Receives a single datagram message.
1602 ///
1603 /// Returns the number of bytes read and the address the message came from.
1604 ///
1605 /// This method must be called with a valid byte slice of sufficient size to hold the message.
1606 /// If the message is too long to fit, excess bytes may get discarded.
1607 ///
1608 /// # Examples
1609 ///
1610 /// ```no_run
1611 /// use async_io::Async;
1612 /// use std::net::UdpSocket;
1613 ///
1614 /// # futures_lite::future::block_on(async {
1615 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1616 ///
1617 /// let mut buf = [0u8; 1024];
1618 /// let (len, addr) = socket.recv_from(&mut buf).await?;
1619 /// # std::io::Result::Ok(()) });
1620 /// ```
1621 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1622 self.read_with(|io| io.recv_from(buf)).await
1623 }
1624
1625 /// Receives a single datagram message without removing it from the queue.
1626 ///
1627 /// Returns the number of bytes read and the address the message came from.
1628 ///
1629 /// This method must be called with a valid byte slice of sufficient size to hold the message.
1630 /// If the message is too long to fit, excess bytes may get discarded.
1631 ///
1632 /// # Examples
1633 ///
1634 /// ```no_run
1635 /// use async_io::Async;
1636 /// use std::net::UdpSocket;
1637 ///
1638 /// # futures_lite::future::block_on(async {
1639 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1640 ///
1641 /// let mut buf = [0u8; 1024];
1642 /// let (len, addr) = socket.peek_from(&mut buf).await?;
1643 /// # std::io::Result::Ok(()) });
1644 /// ```
1645 pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1646 self.read_with(|io| io.peek_from(buf)).await
1647 }
1648
1649 /// Sends data to the specified address.
1650 ///
1651 /// Returns the number of bytes written.
1652 ///
1653 /// # Examples
1654 ///
1655 /// ```no_run
1656 /// use async_io::Async;
1657 /// use std::net::UdpSocket;
1658 ///
1659 /// # futures_lite::future::block_on(async {
1660 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
1661 /// let addr = socket.get_ref().local_addr()?;
1662 ///
1663 /// let msg = b"hello";
1664 /// let len = socket.send_to(msg, addr).await?;
1665 /// # std::io::Result::Ok(()) });
1666 /// ```
1667 pub async fn send_to<A: Into<SocketAddr>>(&self, buf: &[u8], addr: A) -> io::Result<usize> {
1668 let addr = addr.into();
1669 self.write_with(|io| io.send_to(buf, addr)).await
1670 }
1671
1672 /// Receives a single datagram message from the connected peer.
1673 ///
1674 /// Returns the number of bytes read.
1675 ///
1676 /// This method must be called with a valid byte slice of sufficient size to hold the message.
1677 /// If the message is too long to fit, excess bytes may get discarded.
1678 ///
1679 /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1680 /// This method will fail if the socket is not connected.
1681 ///
1682 /// # Examples
1683 ///
1684 /// ```no_run
1685 /// use async_io::Async;
1686 /// use std::net::UdpSocket;
1687 ///
1688 /// # futures_lite::future::block_on(async {
1689 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1690 /// socket.get_ref().connect("127.0.0.1:9000")?;
1691 ///
1692 /// let mut buf = [0u8; 1024];
1693 /// let len = socket.recv(&mut buf).await?;
1694 /// # std::io::Result::Ok(()) });
1695 /// ```
1696 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
1697 self.read_with(|io| io.recv(buf)).await
1698 }
1699
1700 /// Receives a single datagram message from the connected peer without removing it from the
1701 /// queue.
1702 ///
1703 /// Returns the number of bytes read and the address the message came from.
1704 ///
1705 /// This method must be called with a valid byte slice of sufficient size to hold the message.
1706 /// If the message is too long to fit, excess bytes may get discarded.
1707 ///
1708 /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1709 /// This method will fail if the socket is not connected.
1710 ///
1711 /// # Examples
1712 ///
1713 /// ```no_run
1714 /// use async_io::Async;
1715 /// use std::net::UdpSocket;
1716 ///
1717 /// # futures_lite::future::block_on(async {
1718 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1719 /// socket.get_ref().connect("127.0.0.1:9000")?;
1720 ///
1721 /// let mut buf = [0u8; 1024];
1722 /// let len = socket.peek(&mut buf).await?;
1723 /// # std::io::Result::Ok(()) });
1724 /// ```
1725 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1726 self.read_with(|io| io.peek(buf)).await
1727 }
1728
1729 /// Sends data to the connected peer.
1730 ///
1731 /// Returns the number of bytes written.
1732 ///
1733 /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1734 /// This method will fail if the socket is not connected.
1735 ///
1736 /// # Examples
1737 ///
1738 /// ```no_run
1739 /// use async_io::Async;
1740 /// use std::net::UdpSocket;
1741 ///
1742 /// # futures_lite::future::block_on(async {
1743 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1744 /// socket.get_ref().connect("127.0.0.1:9000")?;
1745 ///
1746 /// let msg = b"hello";
1747 /// let len = socket.send(msg).await?;
1748 /// # std::io::Result::Ok(()) });
1749 /// ```
1750 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
1751 self.write_with(|io| io.send(buf)).await
1752 }
1753}
1754
1755impl TryFrom<std::net::UdpSocket> for Async<std::net::UdpSocket> {
1756 type Error = io::Error;
1757
1758 fn try_from(socket: std::net::UdpSocket) -> io::Result<Self> {
1759 Async::new(socket)
1760 }
1761}
1762
1763#[cfg(unix)]
1764impl Async<UnixListener> {
1765 /// Creates a UDS listener bound to the specified path.
1766 ///
1767 /// # Examples
1768 ///
1769 /// ```no_run
1770 /// use async_io::Async;
1771 /// use std::os::unix::net::UnixListener;
1772 ///
1773 /// # futures_lite::future::block_on(async {
1774 /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1775 /// println!("Listening on {:?}", listener.get_ref().local_addr()?);
1776 /// # std::io::Result::Ok(()) });
1777 /// ```
1778 pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixListener>> {
1779 let path = path.as_ref().to_owned();
1780 Async::new(UnixListener::bind(path)?)
1781 }
1782
1783 /// Accepts a new incoming UDS stream connection.
1784 ///
1785 /// When a connection is established, it will be returned as a stream together with its remote
1786 /// address.
1787 ///
1788 /// # Examples
1789 ///
1790 /// ```no_run
1791 /// use async_io::Async;
1792 /// use std::os::unix::net::UnixListener;
1793 ///
1794 /// # futures_lite::future::block_on(async {
1795 /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1796 /// let (stream, addr) = listener.accept().await?;
1797 /// println!("Accepted client: {:?}", addr);
1798 /// # std::io::Result::Ok(()) });
1799 /// ```
1800 pub async fn accept(&self) -> io::Result<(Async<UnixStream>, UnixSocketAddr)> {
1801 let (stream, addr) = self.read_with(|io| io.accept()).await?;
1802 Ok((Async::new(stream)?, addr))
1803 }
1804
1805 /// Returns a stream of incoming UDS connections.
1806 ///
1807 /// The stream is infinite, i.e. it never stops with a [`None`] item.
1808 ///
1809 /// # Examples
1810 ///
1811 /// ```no_run
1812 /// use async_io::Async;
1813 /// use futures_lite::stream::StreamExt;
1814 /// use std::os::unix::net::UnixListener;
1815 /// use std::pin::pin;
1816 ///
1817 /// # futures_lite::future::block_on(async {
1818 /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1819 /// let incoming = listener.incoming();
1820 /// let mut incoming = pin!(incoming);
1821 ///
1822 /// while let Some(stream) = incoming.next().await {
1823 /// let stream = stream?;
1824 /// println!("Accepted client: {:?}", stream.get_ref().peer_addr()?);
1825 /// }
1826 /// # std::io::Result::Ok(()) });
1827 /// ```
1828 pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Send + '_ {
1829 stream::unfold(self, |listener| async move {
1830 let res = listener.accept().await.map(|(stream, _)| stream);
1831 Some((res, listener))
1832 })
1833 }
1834}
1835
1836#[cfg(unix)]
1837impl TryFrom<std::os::unix::net::UnixListener> for Async<std::os::unix::net::UnixListener> {
1838 type Error = io::Error;
1839
1840 fn try_from(listener: std::os::unix::net::UnixListener) -> io::Result<Self> {
1841 Async::new(listener)
1842 }
1843}
1844
1845#[cfg(unix)]
1846impl Async<UnixStream> {
1847 /// Creates a UDS stream connected to the specified path.
1848 ///
1849 /// # Examples
1850 ///
1851 /// ```no_run
1852 /// use async_io::Async;
1853 /// use std::os::unix::net::UnixStream;
1854 ///
1855 /// # futures_lite::future::block_on(async {
1856 /// let stream = Async::<UnixStream>::connect("/tmp/socket").await?;
1857 /// # std::io::Result::Ok(()) });
1858 /// ```
1859 pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixStream>> {
1860 let address = convert_path_to_socket_address(path.as_ref())?;
1861
1862 // Begin async connect.
1863 let socket = connect(address.into(), rn::AddressFamily::UNIX, None)?;
1864 // Use new_nonblocking because connect already sets socket to non-blocking mode.
1865 let stream = Async::new_nonblocking(UnixStream::from(socket))?;
1866
1867 // The stream becomes writable when connected.
1868 stream.writable().await?;
1869
1870 // On Linux, it appears the socket may become writable even when connecting fails, so we
1871 // must do an extra check here and see if the peer address is retrievable.
1872 stream.get_ref().peer_addr()?;
1873 Ok(stream)
1874 }
1875
1876 /// Creates an unnamed pair of connected UDS stream sockets.
1877 ///
1878 /// # Examples
1879 ///
1880 /// ```no_run
1881 /// use async_io::Async;
1882 /// use std::os::unix::net::UnixStream;
1883 ///
1884 /// # futures_lite::future::block_on(async {
1885 /// let (stream1, stream2) = Async::<UnixStream>::pair()?;
1886 /// # std::io::Result::Ok(()) });
1887 /// ```
1888 pub fn pair() -> io::Result<(Async<UnixStream>, Async<UnixStream>)> {
1889 let (stream1, stream2) = UnixStream::pair()?;
1890 Ok((Async::new(stream1)?, Async::new(stream2)?))
1891 }
1892}
1893
1894#[cfg(unix)]
1895impl TryFrom<std::os::unix::net::UnixStream> for Async<std::os::unix::net::UnixStream> {
1896 type Error = io::Error;
1897
1898 fn try_from(stream: std::os::unix::net::UnixStream) -> io::Result<Self> {
1899 Async::new(stream)
1900 }
1901}
1902
1903#[cfg(unix)]
1904impl Async<UnixDatagram> {
1905 /// Creates a UDS datagram socket bound to the specified path.
1906 ///
1907 /// # Examples
1908 ///
1909 /// ```no_run
1910 /// use async_io::Async;
1911 /// use std::os::unix::net::UnixDatagram;
1912 ///
1913 /// # futures_lite::future::block_on(async {
1914 /// let socket = Async::<UnixDatagram>::bind("/tmp/socket")?;
1915 /// # std::io::Result::Ok(()) });
1916 /// ```
1917 pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixDatagram>> {
1918 let path = path.as_ref().to_owned();
1919 Async::new(UnixDatagram::bind(path)?)
1920 }
1921
1922 /// Creates a UDS datagram socket not bound to any address.
1923 ///
1924 /// # Examples
1925 ///
1926 /// ```no_run
1927 /// use async_io::Async;
1928 /// use std::os::unix::net::UnixDatagram;
1929 ///
1930 /// # futures_lite::future::block_on(async {
1931 /// let socket = Async::<UnixDatagram>::unbound()?;
1932 /// # std::io::Result::Ok(()) });
1933 /// ```
1934 pub fn unbound() -> io::Result<Async<UnixDatagram>> {
1935 Async::new(UnixDatagram::unbound()?)
1936 }
1937
1938 /// Creates an unnamed pair of connected Unix datagram sockets.
1939 ///
1940 /// # Examples
1941 ///
1942 /// ```no_run
1943 /// use async_io::Async;
1944 /// use std::os::unix::net::UnixDatagram;
1945 ///
1946 /// # futures_lite::future::block_on(async {
1947 /// let (socket1, socket2) = Async::<UnixDatagram>::pair()?;
1948 /// # std::io::Result::Ok(()) });
1949 /// ```
1950 pub fn pair() -> io::Result<(Async<UnixDatagram>, Async<UnixDatagram>)> {
1951 let (socket1, socket2) = UnixDatagram::pair()?;
1952 Ok((Async::new(socket1)?, Async::new(socket2)?))
1953 }
1954
1955 /// Receives data from the socket.
1956 ///
1957 /// Returns the number of bytes read and the address the message came from.
1958 ///
1959 /// # Examples
1960 ///
1961 /// ```no_run
1962 /// use async_io::Async;
1963 /// use std::os::unix::net::UnixDatagram;
1964 ///
1965 /// # futures_lite::future::block_on(async {
1966 /// let socket = Async::<UnixDatagram>::bind("/tmp/socket")?;
1967 ///
1968 /// let mut buf = [0u8; 1024];
1969 /// let (len, addr) = socket.recv_from(&mut buf).await?;
1970 /// # std::io::Result::Ok(()) });
1971 /// ```
1972 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> {
1973 self.read_with(|io| io.recv_from(buf)).await
1974 }
1975
1976 /// Sends data to the specified address.
1977 ///
1978 /// Returns the number of bytes written.
1979 ///
1980 /// # Examples
1981 ///
1982 /// ```no_run
1983 /// use async_io::Async;
1984 /// use std::os::unix::net::UnixDatagram;
1985 ///
1986 /// # futures_lite::future::block_on(async {
1987 /// let socket = Async::<UnixDatagram>::unbound()?;
1988 ///
1989 /// let msg = b"hello";
1990 /// let addr = "/tmp/socket";
1991 /// let len = socket.send_to(msg, addr).await?;
1992 /// # std::io::Result::Ok(()) });
1993 /// ```
1994 pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> {
1995 self.write_with(|io| io.send_to(buf, &path)).await
1996 }
1997
1998 /// Receives data from the connected peer.
1999 ///
2000 /// Returns the number of bytes read and the address the message came from.
2001 ///
2002 /// The [`connect`][`UnixDatagram::connect()`] method connects this socket to a remote address.
2003 /// This method will fail if the socket is not connected.
2004 ///
2005 /// # Examples
2006 ///
2007 /// ```no_run
2008 /// use async_io::Async;
2009 /// use std::os::unix::net::UnixDatagram;
2010 ///
2011 /// # futures_lite::future::block_on(async {
2012 /// let socket = Async::<UnixDatagram>::bind("/tmp/socket1")?;
2013 /// socket.get_ref().connect("/tmp/socket2")?;
2014 ///
2015 /// let mut buf = [0u8; 1024];
2016 /// let len = socket.recv(&mut buf).await?;
2017 /// # std::io::Result::Ok(()) });
2018 /// ```
2019 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
2020 self.read_with(|io| io.recv(buf)).await
2021 }
2022
2023 /// Sends data to the connected peer.
2024 ///
2025 /// Returns the number of bytes written.
2026 ///
2027 /// The [`connect`][`UnixDatagram::connect()`] method connects this socket to a remote address.
2028 /// This method will fail if the socket is not connected.
2029 ///
2030 /// # Examples
2031 ///
2032 /// ```no_run
2033 /// use async_io::Async;
2034 /// use std::os::unix::net::UnixDatagram;
2035 ///
2036 /// # futures_lite::future::block_on(async {
2037 /// let socket = Async::<UnixDatagram>::bind("/tmp/socket1")?;
2038 /// socket.get_ref().connect("/tmp/socket2")?;
2039 ///
2040 /// let msg = b"hello";
2041 /// let len = socket.send(msg).await?;
2042 /// # std::io::Result::Ok(()) });
2043 /// ```
2044 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
2045 self.write_with(|io| io.send(buf)).await
2046 }
2047}
2048
2049#[cfg(unix)]
2050impl TryFrom<std::os::unix::net::UnixDatagram> for Async<std::os::unix::net::UnixDatagram> {
2051 type Error = io::Error;
2052
2053 fn try_from(socket: std::os::unix::net::UnixDatagram) -> io::Result<Self> {
2054 Async::new(socket)
2055 }
2056}
2057
2058/// Polls a future once, waits for a wakeup, and then optimistically assumes the future is ready.
2059async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()> {
2060 let mut polled = false;
2061 let mut fut = pin!(fut);
2062
2063 poll_fn(|cx| {
2064 if !polled {
2065 polled = true;
2066 fut.as_mut().poll(cx)
2067 } else {
2068 Poll::Ready(Ok(()))
2069 }
2070 })
2071 .await
2072}
2073
2074fn connect(
2075 addr: rn::SocketAddrAny,
2076 domain: rn::AddressFamily,
2077 protocol: Option<rn::Protocol>,
2078) -> io::Result<rustix::fd::OwnedFd> {
2079 #[cfg(windows)]
2080 use rustix::fd::AsFd;
2081
2082 setup_networking();
2083
2084 #[cfg(any(
2085 target_os = "android",
2086 target_os = "dragonfly",
2087 target_os = "freebsd",
2088 target_os = "fuchsia",
2089 target_os = "illumos",
2090 target_os = "linux",
2091 target_os = "netbsd",
2092 target_os = "openbsd"
2093 ))]
2094 let socket = rn::socket_with(
2095 domain,
2096 rn::SocketType::STREAM,
2097 rn::SocketFlags::CLOEXEC | rn::SocketFlags::NONBLOCK,
2098 protocol,
2099 )?;
2100
2101 #[cfg(not(any(
2102 target_os = "android",
2103 target_os = "dragonfly",
2104 target_os = "freebsd",
2105 target_os = "fuchsia",
2106 target_os = "illumos",
2107 target_os = "linux",
2108 target_os = "netbsd",
2109 target_os = "openbsd"
2110 )))]
2111 let socket = {
2112 #[cfg(not(any(
2113 target_os = "aix",
2114 target_vendor = "apple",
2115 target_os = "espidf",
2116 target_os = "haiku",
2117 windows,
2118 )))]
2119 let flags = rn::SocketFlags::CLOEXEC;
2120 #[cfg(any(
2121 target_os = "aix",
2122 target_vendor = "apple",
2123 target_os = "espidf",
2124 target_os = "haiku",
2125 windows,
2126 ))]
2127 let flags = rn::SocketFlags::empty();
2128
2129 // Create the socket.
2130 let socket = rn::socket_with(domain, rn::SocketType::STREAM, flags, protocol)?;
2131
2132 // Set cloexec if necessary.
2133 #[cfg(any(target_os = "aix", target_vendor = "apple"))]
2134 rio::fcntl_setfd(&socket, rio::fcntl_getfd(&socket)? | rio::FdFlags::CLOEXEC)?;
2135
2136 // Set non-blocking mode.
2137 set_nonblocking(socket.as_fd())?;
2138
2139 socket
2140 };
2141
2142 // Set nosigpipe if necessary.
2143 #[cfg(any(
2144 target_vendor = "apple",
2145 target_os = "freebsd",
2146 target_os = "netbsd",
2147 target_os = "dragonfly",
2148 ))]
2149 rn::sockopt::set_socket_nosigpipe(&socket, true)?;
2150
2151 // Set the handle information to HANDLE_FLAG_INHERIT.
2152 #[cfg(windows)]
2153 unsafe {
2154 if windows_sys::Win32::Foundation::SetHandleInformation(
2155 socket.as_raw_socket() as _,
2156 windows_sys::Win32::Foundation::HANDLE_FLAG_INHERIT,
2157 windows_sys::Win32::Foundation::HANDLE_FLAG_INHERIT,
2158 ) == 0
2159 {
2160 return Err(io::Error::last_os_error());
2161 }
2162 }
2163
2164 #[allow(unreachable_patterns)]
2165 match rn::connect(&socket, &addr) {
2166 Ok(_) => {}
2167 #[cfg(unix)]
2168 Err(rio::Errno::INPROGRESS) => {}
2169 Err(rio::Errno::AGAIN) | Err(rio::Errno::WOULDBLOCK) => {}
2170 Err(err) => return Err(err.into()),
2171 }
2172 Ok(socket)
2173}
2174
2175#[inline]
2176fn setup_networking() {
2177 #[cfg(windows)]
2178 {
2179 // On Windows, we need to call WSAStartup before calling any networking code.
2180 // Make sure to call it at least once.
2181 static INIT: std::sync::Once = std::sync::Once::new();
2182
2183 INIT.call_once(|| {
2184 let _ = rustix::net::wsa_startup();
2185 });
2186 }
2187}
2188
2189#[inline]
2190fn set_nonblocking(
2191 #[cfg(unix)] fd: BorrowedFd<'_>,
2192 #[cfg(windows)] fd: BorrowedSocket<'_>,
2193) -> io::Result<()> {
2194 cfg_if::cfg_if! {
2195 // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
2196 // for now, as with the standard library, because it seems to behave
2197 // differently depending on the platform.
2198 // https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d
2199 // https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80
2200 // https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a
2201 if #[cfg(any(windows, target_os = "linux"))] {
2202 rustix::io::ioctl_fionbio(fd, true)?;
2203 } else {
2204 let previous = rustix::fs::fcntl_getfl(fd)?;
2205 let new = previous | rustix::fs::OFlags::NONBLOCK;
2206 if new != previous {
2207 rustix::fs::fcntl_setfl(fd, new)?;
2208 }
2209 }
2210 }
2211
2212 Ok(())
2213}
2214
2215/// Converts a `Path` to its socket address representation.
2216///
2217/// This function is abstract socket-aware.
2218#[cfg(unix)]
2219#[inline]
2220fn convert_path_to_socket_address(path: &Path) -> io::Result<rn::SocketAddrUnix> {
2221 // SocketAddrUnix::new() will throw EINVAL when a path with a zero in it is passed in.
2222 // However, some users expect to be able to pass in paths to abstract sockets, which
2223 // triggers this error as it has a zero in it. Therefore, if a path starts with a zero,
2224 // make it an abstract socket.
2225 #[cfg(any(target_os = "linux", target_os = "android"))]
2226 let address = {
2227 use std::os::unix::ffi::OsStrExt;
2228
2229 let path = path.as_os_str();
2230 match path.as_bytes().first() {
2231 Some(0) => rn::SocketAddrUnix::new_abstract_name(path.as_bytes().get(1..).unwrap())?,
2232 _ => rn::SocketAddrUnix::new(path)?,
2233 }
2234 };
2235
2236 // Only Linux and Android support abstract sockets.
2237 #[cfg(not(any(target_os = "linux", target_os = "android")))]
2238 let address = rn::SocketAddrUnix::new(path)?;
2239
2240 Ok(address)
2241}