local_runtime/
io.rs

1//! Async I/O primitives
2//!
3//! See [`Async`] for more details.
4
5#[cfg(unix)]
6use std::os::{
7    fd::{AsFd, AsRawFd, BorrowedFd},
8    unix::net::UnixStream,
9};
10use std::{
11    fs::File,
12    future::poll_fn,
13    io::{
14        self, BufRead, BufReader, BufWriter, ErrorKind, LineWriter, Read, Stderr, StderrLock,
15        Stdin, StdinLock, Stdout, StdoutLock, Write,
16    },
17    marker::PhantomData,
18    net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
19    pin::Pin,
20    process::{ChildStderr, ChildStdin, ChildStdout},
21    task::{Context, Poll},
22};
23
24use futures_core::Stream;
25use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
26
27use crate::{
28    reactor::{Interest, Source},
29    REACTOR,
30};
31
32/// Types whose I/O trait implementations do not move or drop the underlying I/O object
33///
34/// The I/O object inside [`Async`] cannot be closed before the [`Async`] is dropped, because
35/// `[Async]` deregisters the I/O object from the reactor on drop. Closing the I/O before
36/// deregistering leads to the reactor holding a dangling I/O handle, which violates I/O safety.
37///
38/// As such, functions that grant mutable access to the inner I/O object are unsafe, because they
39/// may move or drop the underlying I/O. Unfortunately, [`Async`] needs to call I/O traits such as
40/// [`Read`] and [`Write`] to implement the async version of those traits.
41///
42/// To signal that those traits are safe to implement for an I/O type, it must implement
43/// [`IoSafe`], which acts as a promise that the I/O type doesn't move or drop itself in its I/O
44/// trait implementations.
45///
46/// This trait is implemented for `std` I/O types.
47///
48/// # Safety
49///
50/// Implementors of [`IoSafe`] must not drop or move its underlying I/O source in its
51/// implementations of [`Read`], [`Write`], and [`BufRead`]. Specifically, the "underlying I/O
52/// source" is defined as the I/O primitive corresponding to the type's `AsFd`/`AsSocket`
53/// implementation.
54pub unsafe trait IoSafe {}
55
56unsafe impl IoSafe for File {}
57unsafe impl IoSafe for Stderr {}
58unsafe impl IoSafe for Stdin {}
59unsafe impl IoSafe for Stdout {}
60unsafe impl IoSafe for StderrLock<'_> {}
61unsafe impl IoSafe for StdinLock<'_> {}
62unsafe impl IoSafe for StdoutLock<'_> {}
63unsafe impl IoSafe for TcpStream {}
64unsafe impl IoSafe for UdpSocket {}
65#[cfg(unix)]
66unsafe impl IoSafe for UnixStream {}
67unsafe impl IoSafe for ChildStdin {}
68unsafe impl IoSafe for ChildStderr {}
69unsafe impl IoSafe for ChildStdout {}
70unsafe impl<T: IoSafe> IoSafe for BufReader<T> {}
71unsafe impl<T: IoSafe + Write> IoSafe for BufWriter<T> {}
72unsafe impl<T: IoSafe + Write> IoSafe for LineWriter<T> {}
73unsafe impl<T: IoSafe + ?Sized> IoSafe for &mut T {}
74unsafe impl<T: IoSafe + ?Sized> IoSafe for Box<T> {}
75
76/// [`IoSafe`] cannot be unconditionally implemented for references, because non-mutable references
77/// can still drop or move internal fields via interior mutability.
78unsafe impl<T: IoSafe + ?Sized> IoSafe for &T {}
79
80// Deregisters the event source on drop to ensure I/O safety
81struct GuardedSource(Source);
82impl Drop for GuardedSource {
83    fn drop(&mut self) {
84        if let Err(err) = REACTOR.with(|r| r.deregister_event(self.0)) {
85            log::error!("Drop failed due to deregistration failure: {err}");
86        }
87    }
88}
89
90/// Async adapter for I/O types
91///
92/// This type puts the I/O object into non-blocking mode, registers it on the reactor, and provides
93/// an async interface for it, including the [`AsyncRead`] and [`AsyncWrite`] traits.
94///
95/// # Supported types
96///
97/// [`Async`] supports any type that implements `AsFd` or `AsSocket`. This includes all standard
98/// networking types. However, `Async` should not be used with types like [`File`] or [`Stdin`],
99/// because they don't work well in non-blocking mode.
100///
101/// # Concurrency
102///
103/// Most operations on [`Async`] take `&self`, so tasks can access it concurrently. However, only
104/// one task can read at a time, and only one task can write at a time. It is fine to have one task
105/// reading while another one writes, but it is not fine to have multiple tasks reading or multiple
106/// tasks writing. Doing so will lead to wakers being lost, which can prevent tasks from waking up
107/// properly.
108///
109/// # Examples
110///
111/// ```no_run
112/// use std::net::TcpStream;
113/// use local_runtime::io::Async;
114/// use futures_lite::AsyncWriteExt;
115///
116/// # local_runtime::block_on(async {
117/// let mut stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 8000)).await?;
118/// stream.write_all(b"hello").await?;
119/// # Ok::<_, std::io::Error>(())
120/// # });
121/// ```
122pub struct Async<T> {
123    // Make sure the handle is dropped before the inner I/O type
124    source: GuardedSource,
125    inner: T,
126    // Make this both !Send and !Sync
127    _phantom: PhantomData<*const ()>,
128}
129
130impl<T> Unpin for Async<T> {}
131
132#[cfg(unix)]
133impl<T: AsFd> Async<T> {
134    /// Create a new async adapter around the I/O object without setting it to non-blocking mode.
135    ///
136    /// This will register the I/O object onto the reactor.
137    ///
138    /// The caller must ensure the I/O object has already been set to non-blocking mode. Otherwise
139    /// it may block the async runtime, preventing other futures from executing on the same thread.
140    ///
141    /// # Error
142    ///
143    /// If there is currently another `Async` constructed on the same I/O object on the current
144    /// thread, this function will return an error.
145    pub fn without_nonblocking(inner: T) -> io::Result<Self> {
146        // SAFETY: GuardedHandle's Drop impl will deregister the FD
147        let source = inner.as_fd().as_raw_fd();
148        unsafe { REACTOR.with(|r| r.register_event(source))? }
149        Ok(Self {
150            inner,
151            source: GuardedSource(source),
152            _phantom: PhantomData,
153        })
154    }
155
156    /// Create a new async adapter around the I/O object.
157    ///
158    /// This will set the I/O object to non-blocking mode and register it onto the reactor.
159    ///
160    /// # Error
161    ///
162    /// If there is currently another `Async` constructed on the same I/O object on the current
163    /// thread, this function will return an error.
164    pub fn new(inner: T) -> io::Result<Self> {
165        set_nonblocking(inner.as_fd())?;
166        Self::without_nonblocking(inner)
167    }
168}
169
170#[cfg(unix)]
171pub(crate) fn set_nonblocking(fd: BorrowedFd) -> io::Result<()> {
172    #[cfg(any(target_os = "linux", target_os = "android"))]
173    rustix::io::ioctl_fionbio(fd, true)?;
174    #[cfg(not(any(target_os = "linux", target_os = "android")))]
175    {
176        let previous = rustix::fs::fcntl_getfl(fd)?;
177        let new = previous | rustix::fs::OFlags::NONBLOCK;
178        if new != previous {
179            rustix::fs::fcntl_setfl(fd, new)?;
180        }
181    }
182    Ok(())
183}
184
185impl<T> Async<T> {
186    /// Get reference to inner I/O handle
187    pub fn get_ref(&self) -> &T {
188        &self.inner
189    }
190
191    /// Deregisters the I/O handle from the reactor and return it
192    pub fn into_inner(self) -> T {
193        self.inner
194    }
195
196    unsafe fn poll_event<'a, P, F>(
197        &'a self,
198        interest: Interest,
199        cx: &mut Context,
200        f: F,
201    ) -> Poll<io::Result<P>>
202    where
203        F: FnOnce(&'a T) -> io::Result<P>,
204    {
205        match f(&self.inner) {
206            Ok(n) => return Poll::Ready(Ok(n)),
207            Err(err) if err.kind() == ErrorKind::WouldBlock => {}
208            Err(err) => return Poll::Ready(Err(err)),
209        }
210        REACTOR.with(|r| r.enable_event(self.source.0, interest, cx.waker()))?;
211        Poll::Pending
212    }
213
214    unsafe fn poll_event_mut<'a, P, F>(
215        &'a mut self,
216        interest: Interest,
217        cx: &mut Context,
218        f: F,
219    ) -> Poll<io::Result<P>>
220    where
221        F: FnOnce(&'a mut T) -> io::Result<P>,
222    {
223        match f(&mut self.inner) {
224            Ok(n) => return Poll::Ready(Ok(n)),
225            Err(err) if err.kind() == ErrorKind::WouldBlock => {}
226            Err(err) => return Poll::Ready(Err(err)),
227        }
228        REACTOR.with(|r| r.enable_event(self.source.0, interest, cx.waker()))?;
229        Poll::Pending
230    }
231
232    /// Perform a single non-blocking read operation
233    ///
234    /// The underlying I/O object is read by the `f` closure once. If the result is
235    /// [`io::ErrorKind::WouldBlock`], then this method returns [`Poll::Pending`] and tells the
236    /// reactor to notify the context `cx` when the I/O object becomes readable.
237    ///
238    /// The closure should not perform multiple I/O operations, such as calling
239    /// [`Write::write_all`]. This is because the closure is restarted for each poll, so the
240    /// first I/O operation will be repeated and the subsequent operations won't be completed.
241    ///
242    /// # Safety
243    ///
244    /// The closure must not drop the underlying I/O object.
245    ///
246    /// # Example
247    ///
248    /// The non-blocking read operation can be converted into a future by wrapping this method in
249    /// [`poll_fn`].
250    ///
251    /// ```no_run
252    /// use std::net::TcpListener;
253    /// use std::future::poll_fn;
254    /// use local_runtime::io::Async;
255    ///
256    /// # local_runtime::block_on(async {
257    /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
258    /// // Accept connections asynchronously
259    /// let (stream, addr) = poll_fn(|cx| unsafe { listener.poll_read_with(cx, |l| l.accept()) }).await?;
260    /// # Ok::<_, std::io::Error>(())
261    /// # });
262    /// ```
263    pub unsafe fn poll_read_with<'a, P, F>(&'a self, cx: &mut Context, f: F) -> Poll<io::Result<P>>
264    where
265        F: FnOnce(&'a T) -> io::Result<P>,
266    {
267        self.poll_event(Interest::Read, cx, f)
268    }
269
270    /// Same as [`Self::poll_read_with`], but takes a mutable reference in the closure
271    ///
272    /// # Safety
273    ///
274    /// The closure must not drop the underlying I/O object.
275    pub unsafe fn poll_read_with_mut<'a, P, F>(
276        &'a mut self,
277        cx: &mut Context,
278        f: F,
279    ) -> Poll<io::Result<P>>
280    where
281        F: FnOnce(&'a mut T) -> io::Result<P>,
282    {
283        self.poll_event_mut(Interest::Read, cx, f)
284    }
285
286    /// Perform a single non-blocking write operation
287    ///
288    /// The underlying I/O object is write by the `f` closure once. If the result is
289    /// [`io::ErrorKind::WouldBlock`], then this method returns [`Poll::Pending`] and tells the
290    /// reactor to notify the context `cx` when the I/O object becomes writable.
291    ///
292    /// The closure should not perform multiple I/O operations, such as calling
293    /// [`Write::write_all`]. This is because the closure is restarted for each poll, so the
294    /// first I/O operation will be repeated and the subsequent operations won't be completed.
295    ///
296    /// # Safety
297    ///
298    /// The closure must not drop the underlying I/O object.
299    ///
300    /// # Example
301    ///
302    /// The non-blocking write operation can be converted into a future by wrapping this method in
303    /// [`poll_fn`].
304    ///
305    /// ```no_run
306    /// use std::net::TcpStream;
307    /// use std::future::poll_fn;
308    /// use std::io::Write;
309    /// use local_runtime::io::Async;
310    ///
311    /// # local_runtime::block_on(async {
312    /// let mut stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 8000)).await?;
313    /// // Write some data asynchronously
314    /// poll_fn(|cx| unsafe { stream.poll_write_with(cx, |mut s| s.write(b"hello")) }).await?;
315    /// # Ok::<_, std::io::Error>(())
316    /// # });
317    /// ```
318    pub unsafe fn poll_write_with<'a, P, F>(&'a self, cx: &mut Context, f: F) -> Poll<io::Result<P>>
319    where
320        F: FnOnce(&'a T) -> io::Result<P>,
321    {
322        self.poll_event(Interest::Write, cx, f)
323    }
324
325    /// Same as [`Self::poll_write_with`], but takes a mutable reference in the closure
326    ///
327    /// # Safety
328    ///
329    /// The closure must not drop the underlying I/O object.
330    pub unsafe fn poll_write_with_mut<'a, P, F>(
331        &'a mut self,
332        cx: &mut Context,
333        f: F,
334    ) -> Poll<io::Result<P>>
335    where
336        F: FnOnce(&'a mut T) -> io::Result<P>,
337    {
338        self.poll_event_mut(Interest::Write, cx, f)
339    }
340
341    async fn wait_for_event_ready(&self, interest: Interest) -> io::Result<()> {
342        let mut first_call = true;
343        poll_fn(|cx| {
344            if first_call {
345                first_call = false;
346                // First enable the event
347                REACTOR.with(|r| r.enable_event(self.source.0, interest, cx.waker()))?;
348                Poll::Pending
349            } else {
350                // Then, check if the event is ready
351                match REACTOR.with(|r| r.is_event_ready(self.source.0, interest)) {
352                    true => Poll::Ready(Ok(())),
353                    // If not, then update the event waker
354                    false => {
355                        REACTOR.with(|r| r.enable_event(self.source.0, interest, cx.waker()))?;
356                        Poll::Pending
357                    }
358                }
359            }
360        })
361        .await
362    }
363
364    /// Waits until the I/O object is available to write without blocking
365    pub async fn writable(&self) -> io::Result<()> {
366        self.wait_for_event_ready(Interest::Write).await
367    }
368
369    /// Waits until the I/O object is available to read without blocking
370    pub async fn readable(&self) -> io::Result<()> {
371        self.wait_for_event_ready(Interest::Read).await
372    }
373}
374
375impl<T: Read + IoSafe> AsyncRead for Async<T> {
376    fn poll_read(
377        mut self: Pin<&mut Self>,
378        cx: &mut Context,
379        buf: &mut [u8],
380    ) -> Poll<io::Result<usize>> {
381        // Safety: IoSafe is implemented
382        unsafe { self.poll_event_mut(Interest::Read, cx, |inner| inner.read(buf)) }
383    }
384}
385
386impl<'a, T> AsyncRead for &'a Async<T>
387where
388    &'a T: Read + IoSafe,
389{
390    fn poll_read(
391        self: Pin<&mut Self>,
392        cx: &mut Context,
393        buf: &mut [u8],
394    ) -> Poll<io::Result<usize>> {
395        // Safety: IoSafe is implemented
396        unsafe { self.poll_event(Interest::Read, cx, |mut inner| inner.read(buf)) }
397    }
398}
399
400impl<T: Write + IoSafe> AsyncWrite for Async<T> {
401    fn poll_write(
402        mut self: Pin<&mut Self>,
403        cx: &mut Context,
404        buf: &[u8],
405    ) -> Poll<io::Result<usize>> {
406        // Safety: IoSafe is implemented
407        unsafe { self.poll_event_mut(Interest::Write, cx, |inner| inner.write(buf)) }
408    }
409
410    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
411        // Safety: IoSafe is implemented
412        unsafe { self.poll_event_mut(Interest::Write, cx, |inner| inner.flush()) }
413    }
414
415    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
416        self.poll_flush(cx)
417    }
418}
419
420impl<'a, T> AsyncWrite for &'a Async<T>
421where
422    &'a T: Write + IoSafe,
423{
424    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
425        // Safety: IoSafe is implemented
426        unsafe { self.poll_event(Interest::Write, cx, |mut inner| inner.write(buf)) }
427    }
428
429    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
430        // Safety: IoSafe is implemented
431        unsafe { self.poll_event(Interest::Write, cx, |mut inner| inner.flush()) }
432    }
433
434    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
435        self.poll_flush(cx)
436    }
437}
438
439impl<T: BufRead + IoSafe> AsyncBufRead for Async<T> {
440    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<&[u8]>> {
441        let this = self.get_mut();
442        // Safety: IoSafe is implemented
443        unsafe { this.poll_event_mut(Interest::Read, cx, |inner| inner.fill_buf()) }
444    }
445
446    fn consume(mut self: Pin<&mut Self>, amt: usize) {
447        BufRead::consume(&mut self.inner, amt);
448    }
449}
450
451impl Async<TcpListener> {
452    /// Create a TCP listener bound to a specific address
453    ///
454    /// # Example
455    ///
456    /// Bind the TCP listener to an OS-assigned port at 127.0.0.1.
457    ///
458    /// ```no_run
459    /// use std::net::TcpListener;
460    /// use local_runtime::io::Async;
461    ///
462    /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
463    /// # Ok::<_, std::io::Error>(())
464    /// ```
465    pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Self> {
466        Async::new(TcpListener::bind(addr.into())?)
467    }
468
469    fn poll_accept(&self, cx: &mut Context) -> Poll<io::Result<(Async<TcpStream>, SocketAddr)>> {
470        // Safety: accept() is I/O safe
471        unsafe {
472            self.poll_event(Interest::Read, cx, |inner| {
473                inner
474                    .accept()
475                    .and_then(|(st, addr)| Async::new(st).map(|st| (st, addr)))
476            })
477        }
478    }
479
480    /// Accept a new incoming TCP connection from this listener
481    ///
482    /// # Example
483    ///
484    /// ```no_run
485    /// use std::net::TcpListener;
486    /// use local_runtime::io::Async;
487    ///
488    /// # local_runtime::block_on(async {
489    /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
490    /// let (stream, addr) = listener.accept().await?;
491    /// # Ok::<_, std::io::Error>(())
492    /// # });
493    /// ```
494    pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> {
495        poll_fn(|cx| self.poll_accept(cx)).await
496    }
497
498    /// Return a stream of incoming TCP connections
499    ///
500    /// The returned stream will never return `None`.
501    ///
502    /// # Example
503    ///
504    /// ```no_run
505    /// use std::net::TcpListener;
506    /// use local_runtime::io::Async;
507    /// use futures_lite::StreamExt;
508    ///
509    /// # local_runtime::block_on(async {
510    /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
511    /// let mut incoming = listener.incoming();
512    /// while let Some(stream) = incoming.next().await {
513    ///     let stream = stream?;
514    /// }
515    /// # Ok::<_, std::io::Error>(())
516    /// # });
517    /// ```
518    pub fn incoming(&self) -> IncomingTcp {
519        IncomingTcp { listener: self }
520    }
521}
522
523/// Stream returned by [`Async::<TcpListener>::incoming`]
524#[must_use = "Streams do nothing unless polled"]
525pub struct IncomingTcp<'a> {
526    listener: &'a Async<TcpListener>,
527}
528
529impl Stream for IncomingTcp<'_> {
530    type Item = io::Result<Async<TcpStream>>;
531
532    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
533        self.listener
534            .poll_accept(cx)
535            .map(|pair| pair.map(|(st, _)| st))
536            .map(Some)
537    }
538}
539
540impl Async<TcpStream> {
541    /// Create a TCP connection to the specified address
542    ///
543    /// ```no_run
544    /// use std::net::TcpStream;
545    /// use local_runtime::io::Async;
546    ///
547    /// # local_runtime::block_on(async {
548    /// let listener = Async::<TcpStream>::connect(([127, 0, 0, 1], 8000)).await?;
549    /// # Ok::<_, std::io::Error>(())
550    /// # });
551    /// ```
552    pub async fn connect<A: Into<SocketAddr>>(addr: A) -> io::Result<Self> {
553        let addr = addr.into();
554        let stream = Async::without_nonblocking(tcp_socket(&addr)?)?;
555
556        // Initiate the connection
557        connect(&stream.inner, &addr)?;
558        // Wait for the stream to be writable
559        stream.wait_for_event_ready(Interest::Write).await?;
560        // Check for errors
561        stream.inner.peer_addr()?;
562        Ok(stream)
563    }
564
565    /// Reads data from the stream without removing it from the buffer.
566    ///
567    /// Returns the number of bytes read. Successive calls of this method read the same data.
568    pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
569        // Safety: peek() is I/O safe
570        unsafe { poll_fn(|cx| self.poll_event(Interest::Read, cx, |inner| inner.peek(buf))).await }
571    }
572}
573
574#[cfg(unix)]
575fn tcp_socket(addr: &SocketAddr) -> io::Result<TcpStream> {
576    use rustix::net::*;
577
578    let af = match addr {
579        SocketAddr::V4(_) => AddressFamily::INET,
580        SocketAddr::V6(_) => AddressFamily::INET6,
581    };
582    let type_ = SocketType::STREAM;
583
584    #[cfg(any(target_os = "linux", target_os = "android"))]
585    let socket = socket_with(
586        af,
587        type_,
588        SocketFlags::NONBLOCK | SocketFlags::CLOEXEC,
589        None,
590    )?;
591    #[cfg(not(any(target_os = "linux", target_os = "android")))]
592    let socket = {
593        let socket = socket_with(af, type_, SocketFlags::empty(), None)?;
594        let previous = rustix::fs::fcntl_getfl(&socket)?;
595        let new = previous | rustix::fs::OFlags::NONBLOCK | rustix::fs::OFlags::CLOEXEC;
596        if new != previous {
597            rustix::fs::fcntl_setfl(&socket, new)?;
598        }
599        socket
600    };
601
602    Ok(socket.into())
603}
604
605#[cfg(unix)]
606fn connect(tcp: &TcpStream, addr: &SocketAddr) -> io::Result<()> {
607    match rustix::net::connect(tcp.as_fd(), addr) {
608        Ok(()) => Ok(()),
609        Err(rustix::io::Errno::INPROGRESS | rustix::io::Errno::WOULDBLOCK) => Ok(()),
610        Err(err) => Err(err.into()),
611    }
612}
613
614impl Async<UdpSocket> {
615    /// Create a UDP socket from the given address
616    ///
617    /// # Example
618    ///
619    /// ```no_run
620    /// use std::net::UdpSocket;
621    /// use local_runtime::io::Async;
622    ///
623    /// # local_runtime::block_on(async {
624    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
625    /// println!("Bound to {}", socket.get_ref().local_addr()?);
626    /// # Ok::<_, std::io::Error>(())
627    /// # });
628    /// ```
629    pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<UdpSocket>> {
630        Async::new(UdpSocket::bind(addr.into())?)
631    }
632
633    /// Receives a single datagram message on a socket
634    ///
635    /// Returns the number of bytes read and the origin address.
636    ///
637    /// The function must be called with valid byte array `buf` of sufficient size to hold the
638    /// message bytes. If a message is too long to fit in the supplied buffer, excess bytes may be
639    /// discarded.
640    ///
641    /// # Example
642    ///
643    /// ```no_run
644    /// use std::net::UdpSocket;
645    /// use local_runtime::io::Async;
646    ///
647    /// # local_runtime::block_on(async {
648    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
649    ///
650    /// let mut buf = [0u8; 1024];
651    /// let (len, addr) = socket.recv_from(&mut buf).await?;
652    /// # Ok::<_, std::io::Error>(())
653    /// # });
654    /// ```
655    pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
656        poll_fn(|cx| unsafe { self.poll_read_with(cx, |inner| inner.recv_from(buf)) }).await
657    }
658
659    /// Receives a single datagram message without removing it from the queue
660    ///
661    /// Returns the number of bytes read and the origin address.
662    ///
663    /// The function must be called with valid byte array `buf` of sufficient size to hold the
664    /// message bytes. If a message is too long to fit in the supplied buffer, excess bytes may be
665    /// discarded.
666    pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
667        poll_fn(|cx| unsafe { self.poll_read_with(cx, |inner| inner.peek_from(buf)) }).await
668    }
669
670    /// Send data to the specified address
671    ///
672    /// Return the number of bytes written
673    ///
674    /// # Example
675    ///
676    /// ```no_run
677    /// use std::net::UdpSocket;
678    /// use local_runtime::io::Async;
679    ///
680    /// # local_runtime::block_on(async {
681    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
682    /// let addr = socket.get_ref().local_addr()?;
683    ///
684    /// let len = socket.send_to(b"hello", addr).await?;
685    /// # Ok::<_, std::io::Error>(())
686    /// # });
687    /// ```
688    pub async fn send_to<A: Into<SocketAddr>>(&self, buf: &[u8], addr: A) -> io::Result<usize> {
689        let addr = addr.into();
690        poll_fn(|cx| unsafe { self.poll_write_with(cx, |inner| inner.send_to(buf, addr)) }).await
691    }
692
693    /// Connect this UDP socket to a remote address, allowing the [`send`](Async::send) and
694    /// [`recv`](Async::recv) methods to be called
695    ///
696    /// Also applies filters to only receive data from the specified address.
697    pub fn connect<A: Into<SocketAddr>>(&self, addr: A) -> io::Result<()> {
698        self.get_ref().connect(addr.into())
699    }
700
701    /// Receives a single datagram message from the connected peer
702    ///
703    /// Returns the number of bytes read.
704    ///
705    /// The function must be called with valid byte array `buf` of sufficient size to hold the
706    /// message bytes. If a message is too long to fit in the supplied buffer, excess bytes may be
707    /// discarded.
708    ///
709    /// This method should only be called after connecting the socket to a remote address via the
710    /// [`connect`](Async::<UdpSocket>::connect) method.
711    ///
712    /// # Example
713    ///
714    /// ```no_run
715    /// use std::net::UdpSocket;
716    /// use local_runtime::io::Async;
717    ///
718    /// # local_runtime::block_on(async {
719    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
720    /// socket.connect(([127, 0, 0, 1], 9000))?;
721    ///
722    /// let mut buf = [0u8; 1024];
723    /// let len = socket.recv(&mut buf).await?;
724    /// # Ok::<_, std::io::Error>(())
725    /// # });
726    /// ```
727    pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
728        poll_fn(|cx| unsafe { self.poll_read_with(cx, |inner| inner.recv(buf)) }).await
729    }
730
731    /// Receives a single datagram message from the connected peer without removing it from the queue
732    ///
733    /// Returns the number of bytes read.
734    ///
735    /// The function must be called with valid byte array `buf` of sufficient size to hold the
736    /// message bytes. If a message is too long to fit in the supplied buffer, excess bytes may be
737    /// discarded.
738    ///
739    /// This method should only be called after connecting the socket to a remote address via the
740    /// [`connect`](Async::<UdpSocket>::connect) method.
741    pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
742        poll_fn(|cx| unsafe { self.poll_read_with(cx, |inner| inner.peek(buf)) }).await
743    }
744
745    /// Send data to the connected peer
746    ///
747    /// Return the number of bytes written.
748    ///
749    /// This method should only be called after connecting the socket to a remote address via the
750    /// [`connect`](Async::<UdpSocket>::connect) method.
751    ///
752    /// # Example
753    ///
754    /// ```no_run
755    /// use std::net::UdpSocket;
756    /// use local_runtime::io::Async;
757    ///
758    /// # local_runtime::block_on(async {
759    /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
760    /// socket.connect(([127, 0, 0, 1], 9000))?;
761    ///
762    /// let len = socket.send(b"hello").await?;
763    /// # Ok::<_, std::io::Error>(())
764    /// # });
765    /// ```
766    pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
767        poll_fn(|cx| unsafe { self.poll_write_with(cx, |inner| inner.send(buf)) }).await
768    }
769}
770
771#[cfg(test)]
772mod tests {
773    use std::{future::Future, io::stderr, pin::pin, sync::Arc};
774
775    use rustix::pipe::pipe;
776
777    use crate::{block_on, test::MockWaker};
778
779    use super::*;
780
781    #[test]
782    fn deregister_on_drop() {
783        let io = Async::without_nonblocking(stderr());
784        assert!(!REACTOR.with(|r| r.is_empty()));
785        drop(io);
786        assert!(REACTOR.with(|r| r.is_empty()));
787    }
788
789    #[test]
790    fn deregister_into_inner() {
791        let io = Async::without_nonblocking(stderr()).unwrap();
792        assert!(!REACTOR.with(|r| r.is_empty()));
793        let _inner = io.into_inner();
794        assert!(REACTOR.with(|r| r.is_empty()));
795    }
796
797    #[test]
798    fn tcp() {
799        let accept_waker = Arc::new(MockWaker::default());
800        let connect_waker = Arc::new(MockWaker::default());
801
802        let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0)).unwrap();
803        let addr = listener.get_ref().local_addr().unwrap();
804        let mut accept = pin!(listener.accept());
805        assert!(accept
806            .as_mut()
807            .poll(&mut Context::from_waker(&accept_waker.clone().into()))
808            .is_pending());
809        let mut connect = pin!(Async::<TcpStream>::connect(addr));
810        assert!(connect
811            .as_mut()
812            .poll(&mut Context::from_waker(&connect_waker.clone().into()))
813            .is_pending());
814
815        block_on(async {
816            let _accepted = accept.await.unwrap();
817            let _conneted = connect.await.unwrap();
818        });
819
820        let mut connect = pin!(Async::<TcpStream>::connect(addr));
821        assert!(connect
822            .as_mut()
823            .poll(&mut Context::from_waker(&connect_waker.into()))
824            .is_pending());
825    }
826
827    #[test]
828    fn writable_readable() {
829        let wr_waker = Arc::new(MockWaker::default());
830        let rd_waker = Arc::new(MockWaker::default());
831
832        let (read, write) = pipe().unwrap();
833        set_nonblocking(read.as_fd()).unwrap();
834        set_nonblocking(write.as_fd()).unwrap();
835
836        let reader = Async::new(read).unwrap();
837        let writer = Async::new(write).unwrap();
838
839        let mut writable = pin!(writer.writable());
840        assert!(writable
841            .as_mut()
842            .poll(&mut Context::from_waker(&wr_waker.clone().into()))
843            .is_pending());
844        REACTOR.with(|r| r.wait()).unwrap();
845        assert!(wr_waker.get());
846        assert!(writable
847            .as_mut()
848            .poll(&mut Context::from_waker(&wr_waker.clone().into()))
849            .is_ready());
850
851        let mut readable = pin!(reader.readable());
852        assert!(readable
853            .as_mut()
854            .poll(&mut Context::from_waker(&rd_waker.clone().into()))
855            .is_pending());
856        // Write one byte to pipe
857        unsafe {
858            assert!(writer
859                .poll_write_with(&mut Context::from_waker(&wr_waker.clone().into()), |w| {
860                    rustix::io::write(w, &[0]).map_err(Into::into)
861                })
862                .is_ready());
863        };
864        REACTOR.with(|r| r.wait()).unwrap();
865        assert!(rd_waker.get());
866        assert!(readable
867            .as_mut()
868            .poll(&mut Context::from_waker(&rd_waker.clone().into()))
869            .is_ready());
870    }
871}