local_runtime/io.rs
1//! Async I/O primitives
2//!
3//! See [`Async`] for more details.
4
5#[cfg(unix)]
6use std::os::{
7 fd::{AsFd, AsRawFd, BorrowedFd},
8 unix::net::UnixStream,
9};
10use std::{
11 fs::File,
12 future::poll_fn,
13 io::{
14 self, BufRead, BufReader, BufWriter, ErrorKind, LineWriter, Read, Stderr, StderrLock,
15 Stdin, StdinLock, Stdout, StdoutLock, Write,
16 },
17 marker::PhantomData,
18 net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
19 pin::Pin,
20 process::{ChildStderr, ChildStdin, ChildStdout},
21 task::{Context, Poll},
22};
23
24use futures_core::Stream;
25use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
26
27use crate::{
28 reactor::{Interest, Source},
29 REACTOR,
30};
31
32/// Types whose I/O trait implementations do not move or drop the underlying I/O object
33///
34/// The I/O object inside [`Async`] cannot be closed before the [`Async`] is dropped, because
35/// `[Async]` deregisters the I/O object from the reactor on drop. Closing the I/O before
36/// deregistering leads to the reactor holding a dangling I/O handle, which violates I/O safety.
37///
38/// As such, functions that grant mutable access to the inner I/O object are unsafe, because they
39/// may move or drop the underlying I/O. Unfortunately, [`Async`] needs to call I/O traits such as
40/// [`Read`] and [`Write`] to implement the async version of those traits.
41///
42/// To signal that those traits are safe to implement for an I/O type, it must implement
43/// [`IoSafe`], which acts as a promise that the I/O type doesn't move or drop itself in its I/O
44/// trait implementations.
45///
46/// This trait is implemented for `std` I/O types.
47///
48/// # Safety
49///
50/// Implementors of [`IoSafe`] must not drop or move its underlying I/O source in its
51/// implementations of [`Read`], [`Write`], and [`BufRead`]. Specifically, the "underlying I/O
52/// source" is defined as the I/O primitive corresponding to the type's `AsFd`/`AsSocket`
53/// implementation.
54pub unsafe trait IoSafe {}
55
56unsafe impl IoSafe for File {}
57unsafe impl IoSafe for Stderr {}
58unsafe impl IoSafe for Stdin {}
59unsafe impl IoSafe for Stdout {}
60unsafe impl IoSafe for StderrLock<'_> {}
61unsafe impl IoSafe for StdinLock<'_> {}
62unsafe impl IoSafe for StdoutLock<'_> {}
63unsafe impl IoSafe for TcpStream {}
64unsafe impl IoSafe for UdpSocket {}
65#[cfg(unix)]
66unsafe impl IoSafe for UnixStream {}
67unsafe impl IoSafe for ChildStdin {}
68unsafe impl IoSafe for ChildStderr {}
69unsafe impl IoSafe for ChildStdout {}
70unsafe impl<T: IoSafe> IoSafe for BufReader<T> {}
71unsafe impl<T: IoSafe + Write> IoSafe for BufWriter<T> {}
72unsafe impl<T: IoSafe + Write> IoSafe for LineWriter<T> {}
73unsafe impl<T: IoSafe + ?Sized> IoSafe for &mut T {}
74unsafe impl<T: IoSafe + ?Sized> IoSafe for Box<T> {}
75
76/// [`IoSafe`] cannot be unconditionally implemented for references, because non-mutable references
77/// can still drop or move internal fields via interior mutability.
78unsafe impl<T: IoSafe + ?Sized> IoSafe for &T {}
79
80// Deregisters the event source on drop to ensure I/O safety
81struct GuardedSource(Source);
82impl Drop for GuardedSource {
83 fn drop(&mut self) {
84 if let Err(err) = REACTOR.with(|r| r.deregister_event(self.0)) {
85 log::error!("Drop failed due to deregistration failure: {err}");
86 }
87 }
88}
89
90/// Async adapter for I/O types
91///
92/// This type puts the I/O object into non-blocking mode, registers it on the reactor, and provides
93/// an async interface for it, including the [`AsyncRead`] and [`AsyncWrite`] traits.
94///
95/// # Supported types
96///
97/// [`Async`] supports any type that implements `AsFd` or `AsSocket`. This includes all standard
98/// networking types. However, `Async` should not be used with types like [`File`] or [`Stdin`],
99/// because they don't work well in non-blocking mode.
100///
101/// # Concurrency
102///
103/// Most operations on [`Async`] take `&self`, so tasks can access it concurrently. However, only
104/// one task can read at a time, and only one task can write at a time. It is fine to have one task
105/// reading while another one writes, but it is not fine to have multiple tasks reading or multiple
106/// tasks writing. Doing so will lead to wakers being lost, which can prevent tasks from waking up
107/// properly.
108///
109/// # Examples
110///
111/// ```no_run
112/// use std::net::TcpStream;
113/// use local_runtime::io::Async;
114/// use futures_lite::AsyncWriteExt;
115///
116/// # local_runtime::block_on(async {
117/// let mut stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 8000)).await?;
118/// stream.write_all(b"hello").await?;
119/// # Ok::<_, std::io::Error>(())
120/// # });
121/// ```
122pub struct Async<T> {
123 // Make sure the handle is dropped before the inner I/O type
124 source: GuardedSource,
125 inner: T,
126 // Make this both !Send and !Sync
127 _phantom: PhantomData<*const ()>,
128}
129
130impl<T> Unpin for Async<T> {}
131
132#[cfg(unix)]
133impl<T: AsFd> Async<T> {
134 /// Create a new async adapter around the I/O object without setting it to non-blocking mode.
135 ///
136 /// This will register the I/O object onto the reactor.
137 ///
138 /// The caller must ensure the I/O object has already been set to non-blocking mode. Otherwise
139 /// it may block the async runtime, preventing other futures from executing on the same thread.
140 ///
141 /// # Error
142 ///
143 /// If there is currently another `Async` constructed on the same I/O object on the current
144 /// thread, this function will return an error.
145 pub fn without_nonblocking(inner: T) -> io::Result<Self> {
146 // SAFETY: GuardedHandle's Drop impl will deregister the FD
147 let source = inner.as_fd().as_raw_fd();
148 unsafe { REACTOR.with(|r| r.register_event(source))? }
149 Ok(Self {
150 inner,
151 source: GuardedSource(source),
152 _phantom: PhantomData,
153 })
154 }
155
156 /// Create a new async adapter around the I/O object.
157 ///
158 /// This will set the I/O object to non-blocking mode and register it onto the reactor.
159 ///
160 /// # Error
161 ///
162 /// If there is currently another `Async` constructed on the same I/O object on the current
163 /// thread, this function will return an error.
164 pub fn new(inner: T) -> io::Result<Self> {
165 set_nonblocking(inner.as_fd())?;
166 Self::without_nonblocking(inner)
167 }
168}
169
170#[cfg(unix)]
171pub(crate) fn set_nonblocking(fd: BorrowedFd) -> io::Result<()> {
172 #[cfg(any(target_os = "linux", target_os = "android"))]
173 rustix::io::ioctl_fionbio(fd, true)?;
174 #[cfg(not(any(target_os = "linux", target_os = "android")))]
175 {
176 let previous = rustix::fs::fcntl_getfl(fd)?;
177 let new = previous | rustix::fs::OFlags::NONBLOCK;
178 if new != previous {
179 rustix::fs::fcntl_setfl(fd, new)?;
180 }
181 }
182 Ok(())
183}
184
185impl<T> Async<T> {
186 /// Get reference to inner I/O handle
187 pub fn get_ref(&self) -> &T {
188 &self.inner
189 }
190
191 /// Deregisters the I/O handle from the reactor and return it
192 pub fn into_inner(self) -> T {
193 self.inner
194 }
195
196 unsafe fn poll_event<'a, P, F>(
197 &'a self,
198 interest: Interest,
199 cx: &mut Context,
200 f: F,
201 ) -> Poll<io::Result<P>>
202 where
203 F: FnOnce(&'a T) -> io::Result<P>,
204 {
205 match f(&self.inner) {
206 Ok(n) => return Poll::Ready(Ok(n)),
207 Err(err) if err.kind() == ErrorKind::WouldBlock => {}
208 Err(err) => return Poll::Ready(Err(err)),
209 }
210 REACTOR.with(|r| r.enable_event(self.source.0, interest, cx.waker()))?;
211 Poll::Pending
212 }
213
214 unsafe fn poll_event_mut<'a, P, F>(
215 &'a mut self,
216 interest: Interest,
217 cx: &mut Context,
218 f: F,
219 ) -> Poll<io::Result<P>>
220 where
221 F: FnOnce(&'a mut T) -> io::Result<P>,
222 {
223 match f(&mut self.inner) {
224 Ok(n) => return Poll::Ready(Ok(n)),
225 Err(err) if err.kind() == ErrorKind::WouldBlock => {}
226 Err(err) => return Poll::Ready(Err(err)),
227 }
228 REACTOR.with(|r| r.enable_event(self.source.0, interest, cx.waker()))?;
229 Poll::Pending
230 }
231
232 /// Perform a single non-blocking read operation
233 ///
234 /// The underlying I/O object is read by the `f` closure once. If the result is
235 /// [`io::ErrorKind::WouldBlock`], then this method returns [`Poll::Pending`] and tells the
236 /// reactor to notify the context `cx` when the I/O object becomes readable.
237 ///
238 /// The closure should not perform multiple I/O operations, such as calling
239 /// [`Write::write_all`]. This is because the closure is restarted for each poll, so the
240 /// first I/O operation will be repeated and the subsequent operations won't be completed.
241 ///
242 /// # Safety
243 ///
244 /// The closure must not drop the underlying I/O object.
245 ///
246 /// # Example
247 ///
248 /// The non-blocking read operation can be converted into a future by wrapping this method in
249 /// [`poll_fn`].
250 ///
251 /// ```no_run
252 /// use std::net::TcpListener;
253 /// use std::future::poll_fn;
254 /// use local_runtime::io::Async;
255 ///
256 /// # local_runtime::block_on(async {
257 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
258 /// // Accept connections asynchronously
259 /// let (stream, addr) = poll_fn(|cx| unsafe { listener.poll_read_with(cx, |l| l.accept()) }).await?;
260 /// # Ok::<_, std::io::Error>(())
261 /// # });
262 /// ```
263 pub unsafe fn poll_read_with<'a, P, F>(&'a self, cx: &mut Context, f: F) -> Poll<io::Result<P>>
264 where
265 F: FnOnce(&'a T) -> io::Result<P>,
266 {
267 self.poll_event(Interest::Read, cx, f)
268 }
269
270 /// Same as [`Self::poll_read_with`], but takes a mutable reference in the closure
271 ///
272 /// # Safety
273 ///
274 /// The closure must not drop the underlying I/O object.
275 pub unsafe fn poll_read_with_mut<'a, P, F>(
276 &'a mut self,
277 cx: &mut Context,
278 f: F,
279 ) -> Poll<io::Result<P>>
280 where
281 F: FnOnce(&'a mut T) -> io::Result<P>,
282 {
283 self.poll_event_mut(Interest::Read, cx, f)
284 }
285
286 /// Perform a single non-blocking write operation
287 ///
288 /// The underlying I/O object is write by the `f` closure once. If the result is
289 /// [`io::ErrorKind::WouldBlock`], then this method returns [`Poll::Pending`] and tells the
290 /// reactor to notify the context `cx` when the I/O object becomes writable.
291 ///
292 /// The closure should not perform multiple I/O operations, such as calling
293 /// [`Write::write_all`]. This is because the closure is restarted for each poll, so the
294 /// first I/O operation will be repeated and the subsequent operations won't be completed.
295 ///
296 /// # Safety
297 ///
298 /// The closure must not drop the underlying I/O object.
299 ///
300 /// # Example
301 ///
302 /// The non-blocking write operation can be converted into a future by wrapping this method in
303 /// [`poll_fn`].
304 ///
305 /// ```no_run
306 /// use std::net::TcpStream;
307 /// use std::future::poll_fn;
308 /// use std::io::Write;
309 /// use local_runtime::io::Async;
310 ///
311 /// # local_runtime::block_on(async {
312 /// let mut stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 8000)).await?;
313 /// // Write some data asynchronously
314 /// poll_fn(|cx| unsafe { stream.poll_write_with(cx, |mut s| s.write(b"hello")) }).await?;
315 /// # Ok::<_, std::io::Error>(())
316 /// # });
317 /// ```
318 pub unsafe fn poll_write_with<'a, P, F>(&'a self, cx: &mut Context, f: F) -> Poll<io::Result<P>>
319 where
320 F: FnOnce(&'a T) -> io::Result<P>,
321 {
322 self.poll_event(Interest::Write, cx, f)
323 }
324
325 /// Same as [`Self::poll_write_with`], but takes a mutable reference in the closure
326 ///
327 /// # Safety
328 ///
329 /// The closure must not drop the underlying I/O object.
330 pub unsafe fn poll_write_with_mut<'a, P, F>(
331 &'a mut self,
332 cx: &mut Context,
333 f: F,
334 ) -> Poll<io::Result<P>>
335 where
336 F: FnOnce(&'a mut T) -> io::Result<P>,
337 {
338 self.poll_event_mut(Interest::Write, cx, f)
339 }
340
341 async fn wait_for_event_ready(&self, interest: Interest) -> io::Result<()> {
342 let mut first_call = true;
343 poll_fn(|cx| {
344 if first_call {
345 first_call = false;
346 // First enable the event
347 REACTOR.with(|r| r.enable_event(self.source.0, interest, cx.waker()))?;
348 Poll::Pending
349 } else {
350 // Then, check if the event is ready
351 match REACTOR.with(|r| r.is_event_ready(self.source.0, interest)) {
352 true => Poll::Ready(Ok(())),
353 // If not, then update the event waker
354 false => {
355 REACTOR.with(|r| r.enable_event(self.source.0, interest, cx.waker()))?;
356 Poll::Pending
357 }
358 }
359 }
360 })
361 .await
362 }
363
364 /// Waits until the I/O object is available to write without blocking
365 pub async fn writable(&self) -> io::Result<()> {
366 self.wait_for_event_ready(Interest::Write).await
367 }
368
369 /// Waits until the I/O object is available to read without blocking
370 pub async fn readable(&self) -> io::Result<()> {
371 self.wait_for_event_ready(Interest::Read).await
372 }
373}
374
375impl<T: Read + IoSafe> AsyncRead for Async<T> {
376 fn poll_read(
377 mut self: Pin<&mut Self>,
378 cx: &mut Context,
379 buf: &mut [u8],
380 ) -> Poll<io::Result<usize>> {
381 // Safety: IoSafe is implemented
382 unsafe { self.poll_event_mut(Interest::Read, cx, |inner| inner.read(buf)) }
383 }
384}
385
386impl<'a, T> AsyncRead for &'a Async<T>
387where
388 &'a T: Read + IoSafe,
389{
390 fn poll_read(
391 self: Pin<&mut Self>,
392 cx: &mut Context,
393 buf: &mut [u8],
394 ) -> Poll<io::Result<usize>> {
395 // Safety: IoSafe is implemented
396 unsafe { self.poll_event(Interest::Read, cx, |mut inner| inner.read(buf)) }
397 }
398}
399
400impl<T: Write + IoSafe> AsyncWrite for Async<T> {
401 fn poll_write(
402 mut self: Pin<&mut Self>,
403 cx: &mut Context,
404 buf: &[u8],
405 ) -> Poll<io::Result<usize>> {
406 // Safety: IoSafe is implemented
407 unsafe { self.poll_event_mut(Interest::Write, cx, |inner| inner.write(buf)) }
408 }
409
410 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
411 // Safety: IoSafe is implemented
412 unsafe { self.poll_event_mut(Interest::Write, cx, |inner| inner.flush()) }
413 }
414
415 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
416 self.poll_flush(cx)
417 }
418}
419
420impl<'a, T> AsyncWrite for &'a Async<T>
421where
422 &'a T: Write + IoSafe,
423{
424 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
425 // Safety: IoSafe is implemented
426 unsafe { self.poll_event(Interest::Write, cx, |mut inner| inner.write(buf)) }
427 }
428
429 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
430 // Safety: IoSafe is implemented
431 unsafe { self.poll_event(Interest::Write, cx, |mut inner| inner.flush()) }
432 }
433
434 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
435 self.poll_flush(cx)
436 }
437}
438
439impl<T: BufRead + IoSafe> AsyncBufRead for Async<T> {
440 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<&[u8]>> {
441 let this = self.get_mut();
442 // Safety: IoSafe is implemented
443 unsafe { this.poll_event_mut(Interest::Read, cx, |inner| inner.fill_buf()) }
444 }
445
446 fn consume(mut self: Pin<&mut Self>, amt: usize) {
447 BufRead::consume(&mut self.inner, amt);
448 }
449}
450
451impl Async<TcpListener> {
452 /// Create a TCP listener bound to a specific address
453 ///
454 /// # Example
455 ///
456 /// Bind the TCP listener to an OS-assigned port at 127.0.0.1.
457 ///
458 /// ```no_run
459 /// use std::net::TcpListener;
460 /// use local_runtime::io::Async;
461 ///
462 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
463 /// # Ok::<_, std::io::Error>(())
464 /// ```
465 pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Self> {
466 Async::new(TcpListener::bind(addr.into())?)
467 }
468
469 fn poll_accept(&self, cx: &mut Context) -> Poll<io::Result<(Async<TcpStream>, SocketAddr)>> {
470 // Safety: accept() is I/O safe
471 unsafe {
472 self.poll_event(Interest::Read, cx, |inner| {
473 inner
474 .accept()
475 .and_then(|(st, addr)| Async::new(st).map(|st| (st, addr)))
476 })
477 }
478 }
479
480 /// Accept a new incoming TCP connection from this listener
481 ///
482 /// # Example
483 ///
484 /// ```no_run
485 /// use std::net::TcpListener;
486 /// use local_runtime::io::Async;
487 ///
488 /// # local_runtime::block_on(async {
489 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
490 /// let (stream, addr) = listener.accept().await?;
491 /// # Ok::<_, std::io::Error>(())
492 /// # });
493 /// ```
494 pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> {
495 poll_fn(|cx| self.poll_accept(cx)).await
496 }
497
498 /// Return a stream of incoming TCP connections
499 ///
500 /// The returned stream will never return `None`.
501 ///
502 /// # Example
503 ///
504 /// ```no_run
505 /// use std::net::TcpListener;
506 /// use local_runtime::io::Async;
507 /// use futures_lite::StreamExt;
508 ///
509 /// # local_runtime::block_on(async {
510 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
511 /// let mut incoming = listener.incoming();
512 /// while let Some(stream) = incoming.next().await {
513 /// let stream = stream?;
514 /// }
515 /// # Ok::<_, std::io::Error>(())
516 /// # });
517 /// ```
518 pub fn incoming(&self) -> IncomingTcp {
519 IncomingTcp { listener: self }
520 }
521}
522
523/// Stream returned by [`Async::<TcpListener>::incoming`]
524#[must_use = "Streams do nothing unless polled"]
525pub struct IncomingTcp<'a> {
526 listener: &'a Async<TcpListener>,
527}
528
529impl Stream for IncomingTcp<'_> {
530 type Item = io::Result<Async<TcpStream>>;
531
532 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
533 self.listener
534 .poll_accept(cx)
535 .map(|pair| pair.map(|(st, _)| st))
536 .map(Some)
537 }
538}
539
540impl Async<TcpStream> {
541 /// Create a TCP connection to the specified address
542 ///
543 /// ```no_run
544 /// use std::net::TcpStream;
545 /// use local_runtime::io::Async;
546 ///
547 /// # local_runtime::block_on(async {
548 /// let listener = Async::<TcpStream>::connect(([127, 0, 0, 1], 8000)).await?;
549 /// # Ok::<_, std::io::Error>(())
550 /// # });
551 /// ```
552 pub async fn connect<A: Into<SocketAddr>>(addr: A) -> io::Result<Self> {
553 let addr = addr.into();
554 let stream = Async::without_nonblocking(tcp_socket(&addr)?)?;
555
556 // Initiate the connection
557 connect(&stream.inner, &addr)?;
558 // Wait for the stream to be writable
559 stream.wait_for_event_ready(Interest::Write).await?;
560 // Check for errors
561 stream.inner.peer_addr()?;
562 Ok(stream)
563 }
564
565 /// Reads data from the stream without removing it from the buffer.
566 ///
567 /// Returns the number of bytes read. Successive calls of this method read the same data.
568 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
569 // Safety: peek() is I/O safe
570 unsafe { poll_fn(|cx| self.poll_event(Interest::Read, cx, |inner| inner.peek(buf))).await }
571 }
572}
573
574#[cfg(unix)]
575fn tcp_socket(addr: &SocketAddr) -> io::Result<TcpStream> {
576 use rustix::net::*;
577
578 let af = match addr {
579 SocketAddr::V4(_) => AddressFamily::INET,
580 SocketAddr::V6(_) => AddressFamily::INET6,
581 };
582 let type_ = SocketType::STREAM;
583
584 #[cfg(any(target_os = "linux", target_os = "android"))]
585 let socket = socket_with(
586 af,
587 type_,
588 SocketFlags::NONBLOCK | SocketFlags::CLOEXEC,
589 None,
590 )?;
591 #[cfg(not(any(target_os = "linux", target_os = "android")))]
592 let socket = {
593 let socket = socket_with(af, type_, SocketFlags::empty(), None)?;
594 let previous = rustix::fs::fcntl_getfl(&socket)?;
595 let new = previous | rustix::fs::OFlags::NONBLOCK | rustix::fs::OFlags::CLOEXEC;
596 if new != previous {
597 rustix::fs::fcntl_setfl(&socket, new)?;
598 }
599 socket
600 };
601
602 Ok(socket.into())
603}
604
605#[cfg(unix)]
606fn connect(tcp: &TcpStream, addr: &SocketAddr) -> io::Result<()> {
607 match rustix::net::connect(tcp.as_fd(), addr) {
608 Ok(()) => Ok(()),
609 Err(rustix::io::Errno::INPROGRESS | rustix::io::Errno::WOULDBLOCK) => Ok(()),
610 Err(err) => Err(err.into()),
611 }
612}
613
614impl Async<UdpSocket> {
615 /// Create a UDP socket from the given address
616 ///
617 /// # Example
618 ///
619 /// ```no_run
620 /// use std::net::UdpSocket;
621 /// use local_runtime::io::Async;
622 ///
623 /// # local_runtime::block_on(async {
624 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
625 /// println!("Bound to {}", socket.get_ref().local_addr()?);
626 /// # Ok::<_, std::io::Error>(())
627 /// # });
628 /// ```
629 pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<UdpSocket>> {
630 Async::new(UdpSocket::bind(addr.into())?)
631 }
632
633 /// Receives a single datagram message on a socket
634 ///
635 /// Returns the number of bytes read and the origin address.
636 ///
637 /// The function must be called with valid byte array `buf` of sufficient size to hold the
638 /// message bytes. If a message is too long to fit in the supplied buffer, excess bytes may be
639 /// discarded.
640 ///
641 /// # Example
642 ///
643 /// ```no_run
644 /// use std::net::UdpSocket;
645 /// use local_runtime::io::Async;
646 ///
647 /// # local_runtime::block_on(async {
648 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
649 ///
650 /// let mut buf = [0u8; 1024];
651 /// let (len, addr) = socket.recv_from(&mut buf).await?;
652 /// # Ok::<_, std::io::Error>(())
653 /// # });
654 /// ```
655 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
656 poll_fn(|cx| unsafe { self.poll_read_with(cx, |inner| inner.recv_from(buf)) }).await
657 }
658
659 /// Receives a single datagram message without removing it from the queue
660 ///
661 /// Returns the number of bytes read and the origin address.
662 ///
663 /// The function must be called with valid byte array `buf` of sufficient size to hold the
664 /// message bytes. If a message is too long to fit in the supplied buffer, excess bytes may be
665 /// discarded.
666 pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
667 poll_fn(|cx| unsafe { self.poll_read_with(cx, |inner| inner.peek_from(buf)) }).await
668 }
669
670 /// Send data to the specified address
671 ///
672 /// Return the number of bytes written
673 ///
674 /// # Example
675 ///
676 /// ```no_run
677 /// use std::net::UdpSocket;
678 /// use local_runtime::io::Async;
679 ///
680 /// # local_runtime::block_on(async {
681 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
682 /// let addr = socket.get_ref().local_addr()?;
683 ///
684 /// let len = socket.send_to(b"hello", addr).await?;
685 /// # Ok::<_, std::io::Error>(())
686 /// # });
687 /// ```
688 pub async fn send_to<A: Into<SocketAddr>>(&self, buf: &[u8], addr: A) -> io::Result<usize> {
689 let addr = addr.into();
690 poll_fn(|cx| unsafe { self.poll_write_with(cx, |inner| inner.send_to(buf, addr)) }).await
691 }
692
693 /// Connect this UDP socket to a remote address, allowing the [`send`](Async::send) and
694 /// [`recv`](Async::recv) methods to be called
695 ///
696 /// Also applies filters to only receive data from the specified address.
697 pub fn connect<A: Into<SocketAddr>>(&self, addr: A) -> io::Result<()> {
698 self.get_ref().connect(addr.into())
699 }
700
701 /// Receives a single datagram message from the connected peer
702 ///
703 /// Returns the number of bytes read.
704 ///
705 /// The function must be called with valid byte array `buf` of sufficient size to hold the
706 /// message bytes. If a message is too long to fit in the supplied buffer, excess bytes may be
707 /// discarded.
708 ///
709 /// This method should only be called after connecting the socket to a remote address via the
710 /// [`connect`](Async::<UdpSocket>::connect) method.
711 ///
712 /// # Example
713 ///
714 /// ```no_run
715 /// use std::net::UdpSocket;
716 /// use local_runtime::io::Async;
717 ///
718 /// # local_runtime::block_on(async {
719 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
720 /// socket.connect(([127, 0, 0, 1], 9000))?;
721 ///
722 /// let mut buf = [0u8; 1024];
723 /// let len = socket.recv(&mut buf).await?;
724 /// # Ok::<_, std::io::Error>(())
725 /// # });
726 /// ```
727 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
728 poll_fn(|cx| unsafe { self.poll_read_with(cx, |inner| inner.recv(buf)) }).await
729 }
730
731 /// Receives a single datagram message from the connected peer without removing it from the queue
732 ///
733 /// Returns the number of bytes read.
734 ///
735 /// The function must be called with valid byte array `buf` of sufficient size to hold the
736 /// message bytes. If a message is too long to fit in the supplied buffer, excess bytes may be
737 /// discarded.
738 ///
739 /// This method should only be called after connecting the socket to a remote address via the
740 /// [`connect`](Async::<UdpSocket>::connect) method.
741 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
742 poll_fn(|cx| unsafe { self.poll_read_with(cx, |inner| inner.peek(buf)) }).await
743 }
744
745 /// Send data to the connected peer
746 ///
747 /// Return the number of bytes written.
748 ///
749 /// This method should only be called after connecting the socket to a remote address via the
750 /// [`connect`](Async::<UdpSocket>::connect) method.
751 ///
752 /// # Example
753 ///
754 /// ```no_run
755 /// use std::net::UdpSocket;
756 /// use local_runtime::io::Async;
757 ///
758 /// # local_runtime::block_on(async {
759 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
760 /// socket.connect(([127, 0, 0, 1], 9000))?;
761 ///
762 /// let len = socket.send(b"hello").await?;
763 /// # Ok::<_, std::io::Error>(())
764 /// # });
765 /// ```
766 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
767 poll_fn(|cx| unsafe { self.poll_write_with(cx, |inner| inner.send(buf)) }).await
768 }
769}
770
771#[cfg(test)]
772mod tests {
773 use std::{future::Future, io::stderr, pin::pin, sync::Arc};
774
775 use rustix::pipe::pipe;
776
777 use crate::{block_on, test::MockWaker};
778
779 use super::*;
780
781 #[test]
782 fn deregister_on_drop() {
783 let io = Async::without_nonblocking(stderr());
784 assert!(!REACTOR.with(|r| r.is_empty()));
785 drop(io);
786 assert!(REACTOR.with(|r| r.is_empty()));
787 }
788
789 #[test]
790 fn deregister_into_inner() {
791 let io = Async::without_nonblocking(stderr()).unwrap();
792 assert!(!REACTOR.with(|r| r.is_empty()));
793 let _inner = io.into_inner();
794 assert!(REACTOR.with(|r| r.is_empty()));
795 }
796
797 #[test]
798 fn tcp() {
799 let accept_waker = Arc::new(MockWaker::default());
800 let connect_waker = Arc::new(MockWaker::default());
801
802 let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0)).unwrap();
803 let addr = listener.get_ref().local_addr().unwrap();
804 let mut accept = pin!(listener.accept());
805 assert!(accept
806 .as_mut()
807 .poll(&mut Context::from_waker(&accept_waker.clone().into()))
808 .is_pending());
809 let mut connect = pin!(Async::<TcpStream>::connect(addr));
810 assert!(connect
811 .as_mut()
812 .poll(&mut Context::from_waker(&connect_waker.clone().into()))
813 .is_pending());
814
815 block_on(async {
816 let _accepted = accept.await.unwrap();
817 let _conneted = connect.await.unwrap();
818 });
819
820 let mut connect = pin!(Async::<TcpStream>::connect(addr));
821 assert!(connect
822 .as_mut()
823 .poll(&mut Context::from_waker(&connect_waker.into()))
824 .is_pending());
825 }
826
827 #[test]
828 fn writable_readable() {
829 let wr_waker = Arc::new(MockWaker::default());
830 let rd_waker = Arc::new(MockWaker::default());
831
832 let (read, write) = pipe().unwrap();
833 set_nonblocking(read.as_fd()).unwrap();
834 set_nonblocking(write.as_fd()).unwrap();
835
836 let reader = Async::new(read).unwrap();
837 let writer = Async::new(write).unwrap();
838
839 let mut writable = pin!(writer.writable());
840 assert!(writable
841 .as_mut()
842 .poll(&mut Context::from_waker(&wr_waker.clone().into()))
843 .is_pending());
844 REACTOR.with(|r| r.wait()).unwrap();
845 assert!(wr_waker.get());
846 assert!(writable
847 .as_mut()
848 .poll(&mut Context::from_waker(&wr_waker.clone().into()))
849 .is_ready());
850
851 let mut readable = pin!(reader.readable());
852 assert!(readable
853 .as_mut()
854 .poll(&mut Context::from_waker(&rd_waker.clone().into()))
855 .is_pending());
856 // Write one byte to pipe
857 unsafe {
858 assert!(writer
859 .poll_write_with(&mut Context::from_waker(&wr_waker.clone().into()), |w| {
860 rustix::io::write(w, &[0]).map_err(Into::into)
861 })
862 .is_ready());
863 };
864 REACTOR.with(|r| r.wait()).unwrap();
865 assert!(rd_waker.get());
866 assert!(readable
867 .as_mut()
868 .poll(&mut Context::from_waker(&rd_waker.clone().into()))
869 .is_ready());
870 }
871}