a10/io/
mod.rs

1//! Type definitions for I/O functionality.
2//!
3//! The main types of this module are the [`Buf`] and [`BufMut`] traits, which
4//! define the requirements on buffers using the I/O system calls on an file
5//! descriptor ([`AsyncFd`]). Additionally the [`BufSlice`] and [`BufMutSlice`]
6//! traits existing to define the behaviour of buffers in vectored I/O.
7//!
8//! A specialised read buffer pool implementation exists in [`ReadBufPool`],
9//! which is a buffer pool managed by the kernel when making `read(2)`-like
10//! system calls.
11//!
12//! Finally we have the [`stdin`], [`stdout`] and [`stderr`] functions to create
13//! `AsyncFd`s for standard in, out and error respectively.
14
15// This is not ideal.
16// This should only be applied to `ReadVectored` and `WriteVectored` as they use
17// `libc::iovec` internally, which is `!Send`, while it actually is `Send`.
18#![allow(clippy::non_send_fields_in_send_ty)]
19
20use std::future::Future;
21use std::marker::PhantomData;
22use std::mem::{ManuallyDrop, MaybeUninit};
23use std::os::fd::RawFd;
24use std::pin::Pin;
25use std::task::{self, Poll};
26use std::{io, ptr};
27
28use crate::cancel::{Cancel, CancelOp, CancelResult};
29use crate::extract::{Extract, Extractor};
30use crate::fd::{AsyncFd, Descriptor, File};
31use crate::op::{op_future, poll_state, OpState, NO_OFFSET};
32use crate::{libc, man_link, SubmissionQueue};
33
34mod read_buf;
35#[doc(hidden)]
36pub use read_buf::{BufGroupId, BufIdx};
37pub use read_buf::{ReadBuf, ReadBufPool};
38
39// Re-export so we don't have to worry about import `std::io` and `crate::io`.
40pub(crate) use std::io::*;
41
42macro_rules! stdio {
43    (
44        $fn: ident () -> $name: ident, $fd: expr
45    ) => {
46        #[doc = concat!("Create a new `", stringify!($name), "`.\n\n")]
47        pub fn $fn(sq: $crate::SubmissionQueue) -> $name {
48            unsafe { $name(std::mem::ManuallyDrop::new($crate::AsyncFd::from_raw_fd($fd, sq))) }
49        }
50
51        #[doc = concat!(
52            "An [`AsyncFd`] for ", stringify!($fn), ".\n\n",
53            "# Notes\n\n",
54            "This directly writes to the raw file descriptor, which means it's not buffered and will not flush anything buffered by the standard library.\n\n",
55            "When this type is dropped it will not close ", stringify!($fn), ".",
56        )]
57        pub struct $name(std::mem::ManuallyDrop<$crate::AsyncFd>);
58
59        impl std::ops::Deref for $name {
60            type Target = $crate::AsyncFd;
61
62            fn deref(&self) -> &Self::Target {
63                &self.0
64            }
65        }
66
67        impl std::fmt::Debug for $name {
68            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69                f.debug_struct(stringify!($name))
70                    .field("fd", &*self.0)
71                    .finish()
72            }
73        }
74
75        impl std::ops::Drop for $name {
76            fn drop(&mut self) {
77                // We don't want to close the file descriptor, but we do need to
78                // drop our reference to the submission queue.
79                // SAFETY: with `ManuallyDrop` we don't drop the `AsyncFd` so
80                // it's not dropped twice. Otherwise we get access to it using
81                // safe methods.
82                unsafe { std::ptr::drop_in_place(&mut self.0.sq) };
83            }
84        }
85    };
86}
87
88stdio!(stdin() -> Stdin, libc::STDIN_FILENO);
89stdio!(stdout() -> Stdout, libc::STDOUT_FILENO);
90stdio!(stderr() -> Stderr, libc::STDERR_FILENO);
91
92/// I/O system calls.
93impl<D: Descriptor> AsyncFd<D> {
94    /// Read from this fd into `buf`.
95    #[doc = man_link!(read(2))]
96    pub const fn read<'fd, B>(&'fd self, buf: B) -> Read<'fd, B, D>
97    where
98        B: BufMut,
99    {
100        self.read_at(buf, NO_OFFSET)
101    }
102
103    /// Read from this fd into `buf` starting at `offset`.
104    ///
105    /// The current file cursor is not affected by this function. This means
106    /// that a call `read_at(buf, 1024)` with a buffer of 1kb will **not**
107    /// continue reading at 2kb in the next call to `read`.
108    pub const fn read_at<'fd, B>(&'fd self, buf: B, offset: u64) -> Read<'fd, B, D>
109    where
110        B: BufMut,
111    {
112        Read::new(self, buf, offset)
113    }
114
115    /// Read at least `n` bytes from this fd into `buf`.
116    pub const fn read_n<'fd, B>(&'fd self, buf: B, n: usize) -> ReadN<'fd, B, D>
117    where
118        B: BufMut,
119    {
120        self.read_n_at(buf, NO_OFFSET, n)
121    }
122
123    /// Read at least `n` bytes from this fd into `buf` starting at `offset`.
124    ///
125    /// The current file cursor is not affected by this function.
126    pub const fn read_n_at<'fd, B>(&'fd self, buf: B, offset: u64, n: usize) -> ReadN<'fd, B, D>
127    where
128        B: BufMut,
129    {
130        ReadN::new(self, buf, offset, n)
131    }
132
133    /// Read from this fd into `bufs`.
134    #[doc = man_link!(readv(2))]
135    pub fn read_vectored<'fd, B, const N: usize>(&'fd self, bufs: B) -> ReadVectored<'fd, B, N, D>
136    where
137        B: BufMutSlice<N>,
138    {
139        self.read_vectored_at(bufs, NO_OFFSET)
140    }
141
142    /// Read from this fd into `bufs` starting at `offset`.
143    ///
144    /// The current file cursor is not affected by this function.
145    pub fn read_vectored_at<'fd, B, const N: usize>(
146        &'fd self,
147        mut bufs: B,
148        offset: u64,
149    ) -> ReadVectored<'fd, B, N, D>
150    where
151        B: BufMutSlice<N>,
152    {
153        let iovecs = unsafe { bufs.as_iovecs_mut() };
154        ReadVectored::new(self, bufs, iovecs, offset)
155    }
156
157    /// Read at least `n` bytes from this fd into `bufs`.
158    pub fn read_n_vectored<'fd, B, const N: usize>(
159        &'fd self,
160        bufs: B,
161        n: usize,
162    ) -> ReadNVectored<'fd, B, N, D>
163    where
164        B: BufMutSlice<N>,
165    {
166        self.read_n_vectored_at(bufs, NO_OFFSET, n)
167    }
168
169    /// Read at least `n` bytes from this fd into `bufs`.
170    ///
171    /// The current file cursor is not affected by this function.
172    pub fn read_n_vectored_at<'fd, B, const N: usize>(
173        &'fd self,
174        bufs: B,
175        offset: u64,
176        n: usize,
177    ) -> ReadNVectored<'fd, B, N, D>
178    where
179        B: BufMutSlice<N>,
180    {
181        ReadNVectored::new(self, bufs, offset, n)
182    }
183
184    /// Write `buf` to this fd.
185    #[doc = man_link!(write(2))]
186    pub const fn write<'fd, B>(&'fd self, buf: B) -> Write<'fd, B, D>
187    where
188        B: Buf,
189    {
190        self.write_at(buf, NO_OFFSET)
191    }
192
193    /// Write `buf` to this fd at `offset`.
194    ///
195    /// The current file cursor is not affected by this function.
196    pub const fn write_at<'fd, B>(&'fd self, buf: B, offset: u64) -> Write<'fd, B, D>
197    where
198        B: Buf,
199    {
200        Write::new(self, buf, offset)
201    }
202
203    /// Write all of `buf` to this fd.
204    pub const fn write_all<'fd, B>(&'fd self, buf: B) -> WriteAll<'fd, B, D>
205    where
206        B: Buf,
207    {
208        self.write_all_at(buf, NO_OFFSET)
209    }
210
211    /// Write all of `buf` to this fd at `offset`.
212    ///
213    /// The current file cursor is not affected by this function.
214    pub const fn write_all_at<'fd, B>(&'fd self, buf: B, offset: u64) -> WriteAll<'fd, B, D>
215    where
216        B: Buf,
217    {
218        WriteAll::new(self, buf, offset)
219    }
220
221    /// Write `bufs` to this file.
222    #[doc = man_link!(writev(2))]
223    pub fn write_vectored<'fd, B, const N: usize>(&'fd self, bufs: B) -> WriteVectored<'fd, B, N, D>
224    where
225        B: BufSlice<N>,
226    {
227        self.write_vectored_at(bufs, NO_OFFSET)
228    }
229
230    /// Write `bufs` to this file at `offset`.
231    ///
232    /// The current file cursor is not affected by this function.
233    pub fn write_vectored_at<'fd, B, const N: usize>(
234        &'fd self,
235        bufs: B,
236        offset: u64,
237    ) -> WriteVectored<'fd, B, N, D>
238    where
239        B: BufSlice<N>,
240    {
241        let iovecs = unsafe { bufs.as_iovecs() };
242        WriteVectored::new(self, bufs, iovecs, offset)
243    }
244
245    /// Write all `bufs` to this file.
246    pub fn write_all_vectored<'fd, B, const N: usize>(
247        &'fd self,
248        bufs: B,
249    ) -> WriteAllVectored<'fd, B, N, D>
250    where
251        B: BufSlice<N>,
252    {
253        self.write_all_vectored_at(bufs, NO_OFFSET)
254    }
255
256    /// Write all `bufs` to this file at `offset`.
257    ///
258    /// The current file cursor is not affected by this function.
259    pub fn write_all_vectored_at<'fd, B, const N: usize>(
260        &'fd self,
261        bufs: B,
262        offset: u64,
263    ) -> WriteAllVectored<'fd, B, N, D>
264    where
265        B: BufSlice<N>,
266    {
267        WriteAllVectored::new(self, bufs, offset)
268    }
269
270    /// Splice `length` bytes to `target` fd.
271    ///
272    /// See the `splice(2)` manual for correct usage.
273    #[doc = man_link!(splice(2))]
274    #[doc(alias = "splice")]
275    pub const fn splice_to<'fd>(
276        &'fd self,
277        target: RawFd,
278        length: u32,
279        flags: libc::c_int,
280    ) -> Splice<'fd, D> {
281        self.splice_to_at(NO_OFFSET, target, NO_OFFSET, length, flags)
282    }
283
284    /// Same as [`AsyncFd::splice_to`], but starts reading data at `offset` from
285    /// the file (instead of the current position of the read cursor) and starts
286    /// writing at `target_offset` to `target`.
287    pub const fn splice_to_at<'fd>(
288        &'fd self,
289        offset: u64,
290        target: RawFd,
291        target_offset: u64,
292        length: u32,
293        flags: libc::c_int,
294    ) -> Splice<'fd, D> {
295        self.splice(
296            target,
297            SpliceDirection::To,
298            offset,
299            target_offset,
300            length,
301            flags,
302        )
303    }
304
305    /// Splice `length` bytes from `target` fd.
306    ///
307    /// See the `splice(2)` manual for correct usage.
308    #[doc(alias = "splice")]
309    pub const fn splice_from<'fd>(
310        &'fd self,
311        target: RawFd,
312        length: u32,
313        flags: libc::c_int,
314    ) -> Splice<'fd, D> {
315        self.splice_from_at(NO_OFFSET, target, NO_OFFSET, length, flags)
316    }
317
318    /// Same as [`AsyncFd::splice_from`], but starts reading writing at `offset`
319    /// to the file (instead of the current position of the write cursor) and
320    /// starts reading at `target_offset` from `target`.
321    #[doc(alias = "splice")]
322    pub const fn splice_from_at<'fd>(
323        &'fd self,
324        offset: u64,
325        target: RawFd,
326        target_offset: u64,
327        length: u32,
328        flags: libc::c_int,
329    ) -> Splice<'fd, D> {
330        self.splice(
331            target,
332            SpliceDirection::From,
333            target_offset,
334            offset,
335            length,
336            flags,
337        )
338    }
339
340    const fn splice<'fd>(
341        &'fd self,
342        target: RawFd,
343        direction: SpliceDirection,
344        off_in: u64,
345        off_out: u64,
346        length: u32,
347        flags: libc::c_int,
348    ) -> Splice<'fd, D> {
349        Splice::new(self, (target, direction, off_in, off_out, length, flags))
350    }
351
352    /// Explicitly close the file descriptor.
353    ///
354    /// # Notes
355    ///
356    /// This happens automatically on drop, this can be used to get a possible
357    /// error.
358    #[doc = man_link!(close(2))]
359    pub fn close(self) -> Close<D> {
360        // We deconstruct `self` without dropping it to avoid closing the fd
361        // twice.
362        let this = ManuallyDrop::new(self);
363        // SAFETY: this is safe because we're ensure the pointers are valid and
364        // not touching `this` after reading the fields.
365        let fd = this.fd();
366        let sq = unsafe { ptr::read(&this.sq) };
367
368        Close {
369            sq,
370            state: OpState::NotStarted(fd),
371            kind: PhantomData,
372        }
373    }
374}
375
376// Read.
377op_future! {
378    fn AsyncFd::read -> B,
379    struct Read<'fd, B: BufMut> {
380        /// Buffer to write into, needs to stay in memory so the kernel can
381        /// access it safely.
382        buf: B,
383    },
384    drop_using: Box,
385    setup_state: offset: u64,
386    setup: |submission, fd, (buf,), offset| unsafe {
387        let (ptr, len) = buf.parts_mut();
388        submission.read_at(fd.fd(), ptr, len, offset);
389        if let Some(buf_group) = buf.buffer_group() {
390            submission.set_buffer_select(buf_group.0);
391        }
392    },
393    map_result: |this, (mut buf,), buf_idx, n| {
394        // SAFETY: the kernel initialised the bytes for us as part of the read
395        // call.
396        #[allow(clippy::cast_sign_loss)] // Negative values are mapped to errors.
397        unsafe { buf.buffer_init(BufIdx(buf_idx), n as u32) };
398        Ok(buf)
399    },
400}
401
402/// [`Future`] behind [`AsyncFd::read_n`].
403#[derive(Debug)]
404pub struct ReadN<'fd, B, D: Descriptor = File> {
405    read: Read<'fd, ReadNBuf<B>, D>,
406    offset: u64,
407    /// Number of bytes we still need to read to hit our target `N`.
408    left: usize,
409}
410
411impl<'fd, B: BufMut, D: Descriptor> ReadN<'fd, B, D> {
412    const fn new(fd: &'fd AsyncFd<D>, buf: B, offset: u64, n: usize) -> ReadN<'fd, B, D> {
413        let buf = ReadNBuf { buf, last_read: 0 };
414        ReadN {
415            read: fd.read_at(buf, offset),
416            offset,
417            left: n,
418        }
419    }
420}
421
422impl<'fd, B, D: Descriptor> Cancel for ReadN<'fd, B, D> {
423    fn try_cancel(&mut self) -> CancelResult {
424        self.read.try_cancel()
425    }
426
427    fn cancel(&mut self) -> CancelOp {
428        self.read.cancel()
429    }
430}
431
432impl<'fd, B: BufMut, D: Descriptor> Future for ReadN<'fd, B, D> {
433    type Output = io::Result<B>;
434
435    fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
436        // SAFETY: not moving `Future`.
437        let this = unsafe { Pin::into_inner_unchecked(self) };
438        let mut read = unsafe { Pin::new_unchecked(&mut this.read) };
439        match read.as_mut().poll(ctx) {
440            Poll::Ready(Ok(buf)) => {
441                if buf.last_read == 0 {
442                    return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
443                }
444
445                if buf.last_read >= this.left {
446                    // Read the required amount of bytes.
447                    return Poll::Ready(Ok(buf.buf));
448                }
449
450                this.left -= buf.last_read;
451                if this.offset != NO_OFFSET {
452                    this.offset += buf.last_read as u64;
453                }
454
455                read.set(read.fd.read_at(buf, this.offset));
456                unsafe { Pin::new_unchecked(this) }.poll(ctx)
457            }
458            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
459            Poll::Pending => Poll::Pending,
460        }
461    }
462}
463
464// ReadVectored.
465op_future! {
466    fn AsyncFd::read_vectored -> B,
467    struct ReadVectored<'fd, B: BufMutSlice<N>; const N: usize> {
468        /// Buffers to write into, needs to stay in memory so the kernel can
469        /// access it safely.
470        bufs: B,
471        /// Buffer references used by the kernel.
472        ///
473        /// NOTE: we only need these iovecs in the submission, we don't have to
474        /// keep around during the operation. Because of this we don't heap
475        /// allocate it like we for other operations. This leaves a small
476        /// duration between the submission of the entry and the submission
477        /// being read by the kernel in which this future could be dropped and
478        /// the kernel will read memory we don't own. However because we wake
479        /// the kernel after submitting the timeout entry it's not really worth
480        /// to heap allocation.
481        iovecs: [libc::iovec; N],
482    },
483    drop_using: Box,
484    /// `iovecs` can't move until the kernel has read the submission.
485    impl !Unpin,
486    setup_state: offset: u64,
487    setup: |submission, fd, (_bufs, iovecs), offset| unsafe {
488        submission.read_vectored_at(fd.fd(), iovecs, offset);
489    },
490    map_result: |this, (mut bufs, _iovecs), _flags, n| {
491        // SAFETY: the kernel initialised the bytes for us as part of the read
492        // call.
493        #[allow(clippy::cast_sign_loss)] // Negative values are mapped to errors.
494        unsafe { bufs.set_init(n as usize) };
495        Ok(bufs)
496    },
497}
498
499/// [`Future`] behind [`AsyncFd::read_n_vectored`].
500#[derive(Debug)]
501pub struct ReadNVectored<'fd, B, const N: usize, D: Descriptor = File> {
502    read: ReadVectored<'fd, ReadNBuf<B>, N, D>,
503    offset: u64,
504    /// Number of bytes we still need to read to hit our target `N`.
505    left: usize,
506}
507
508impl<'fd, B: BufMutSlice<N>, const N: usize, D: Descriptor> ReadNVectored<'fd, B, N, D> {
509    fn new(fd: &'fd AsyncFd<D>, bufs: B, offset: u64, n: usize) -> ReadNVectored<'fd, B, N, D> {
510        let bufs = ReadNBuf {
511            buf: bufs,
512            last_read: 0,
513        };
514        ReadNVectored {
515            read: fd.read_vectored_at(bufs, offset),
516            offset,
517            left: n,
518        }
519    }
520}
521
522impl<'fd, B, const N: usize, D: Descriptor> Cancel for ReadNVectored<'fd, B, N, D> {
523    fn try_cancel(&mut self) -> CancelResult {
524        self.read.try_cancel()
525    }
526
527    fn cancel(&mut self) -> CancelOp {
528        self.read.cancel()
529    }
530}
531
532impl<'fd, B: BufMutSlice<N>, const N: usize, D: Descriptor> Future for ReadNVectored<'fd, B, N, D> {
533    type Output = io::Result<B>;
534
535    fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
536        // SAFETY: not moving `Future`.
537        let this = unsafe { Pin::into_inner_unchecked(self) };
538        let mut read = unsafe { Pin::new_unchecked(&mut this.read) };
539        match read.as_mut().poll(ctx) {
540            Poll::Ready(Ok(bufs)) => {
541                if bufs.last_read == 0 {
542                    return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
543                }
544
545                if bufs.last_read >= this.left {
546                    // Read the required amount of bytes.
547                    return Poll::Ready(Ok(bufs.buf));
548                }
549
550                this.left -= bufs.last_read;
551                if this.offset != NO_OFFSET {
552                    this.offset += bufs.last_read as u64;
553                }
554
555                read.set(read.fd.read_vectored_at(bufs, this.offset));
556                unsafe { Pin::new_unchecked(this) }.poll(ctx)
557            }
558            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
559            Poll::Pending => Poll::Pending,
560        }
561    }
562}
563
564/// Wrapper around a buffer `B` to keep track of the number of bytes written,
565#[derive(Debug)]
566pub(crate) struct ReadNBuf<B> {
567    pub(crate) buf: B,
568    pub(crate) last_read: usize,
569}
570
571unsafe impl<B: BufMut> BufMut for ReadNBuf<B> {
572    unsafe fn parts_mut(&mut self) -> (*mut u8, u32) {
573        self.buf.parts_mut()
574    }
575
576    unsafe fn set_init(&mut self, n: usize) {
577        self.last_read = n;
578        self.buf.set_init(n);
579    }
580}
581
582unsafe impl<B: BufMutSlice<N>, const N: usize> BufMutSlice<N> for ReadNBuf<B> {
583    unsafe fn as_iovecs_mut(&mut self) -> [libc::iovec; N] {
584        self.buf.as_iovecs_mut()
585    }
586
587    unsafe fn set_init(&mut self, n: usize) {
588        self.last_read = n;
589        self.buf.set_init(n);
590    }
591}
592
593// Write.
594op_future! {
595    fn AsyncFd::write -> usize,
596    struct Write<'fd, B: Buf> {
597        /// Buffer to read from, needs to stay in memory so the kernel can
598        /// access it safely.
599        buf: B,
600    },
601    drop_using: Box,
602    setup_state: offset: u64,
603    setup: |submission, fd, (buf,), offset| unsafe {
604        let (ptr, len) = buf.parts();
605        submission.write_at(fd.fd(), ptr, len, offset);
606    },
607    map_result: |n| {
608        #[allow(clippy::cast_sign_loss)] // Negative values are mapped to errors.
609        Ok(n as usize)
610    },
611    extract: |this, (buf,), n| -> (B, usize) {
612        #[allow(clippy::cast_sign_loss)] // Negative values are mapped to errors.
613        Ok((buf, n as usize))
614    },
615}
616
617/// [`Future`] behind [`AsyncFd::write_all`].
618#[derive(Debug)]
619pub struct WriteAll<'fd, B, D: Descriptor = File> {
620    write: Extractor<Write<'fd, SkipBuf<B>, D>>,
621    offset: u64,
622}
623
624impl<'fd, B: Buf, D: Descriptor> WriteAll<'fd, B, D> {
625    const fn new(fd: &'fd AsyncFd<D>, buf: B, offset: u64) -> WriteAll<'fd, B, D> {
626        let buf = SkipBuf { buf, skip: 0 };
627        WriteAll {
628            // TODO: once `Extract` is a constant trait use that.
629            write: Extractor {
630                fut: fd.write_at(buf, offset),
631            },
632            offset,
633        }
634    }
635
636    /// Poll implementation used by the [`Future`] implement for the naked type
637    /// and the type wrapper in an [`Extractor`].
638    fn inner_poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<io::Result<B>> {
639        // SAFETY: not moving `Future`.
640        let this = unsafe { Pin::into_inner_unchecked(self) };
641        let mut write = unsafe { Pin::new_unchecked(&mut this.write) };
642        match write.as_mut().poll(ctx) {
643            Poll::Ready(Ok((_, 0))) => Poll::Ready(Err(io::ErrorKind::WriteZero.into())),
644            Poll::Ready(Ok((mut buf, n))) => {
645                buf.skip += n as u32;
646                if this.offset != NO_OFFSET {
647                    this.offset += n as u64;
648                }
649
650                if let (_, 0) = unsafe { buf.parts() } {
651                    // Written everything.
652                    return Poll::Ready(Ok(buf.buf));
653                }
654
655                write.set(write.fut.fd.write_at(buf, this.offset).extract());
656                unsafe { Pin::new_unchecked(this) }.inner_poll(ctx)
657            }
658            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
659            Poll::Pending => Poll::Pending,
660        }
661    }
662}
663
664impl<'fd, B, D: Descriptor> Cancel for WriteAll<'fd, B, D> {
665    fn try_cancel(&mut self) -> CancelResult {
666        self.write.try_cancel()
667    }
668
669    fn cancel(&mut self) -> CancelOp {
670        self.write.cancel()
671    }
672}
673
674impl<'fd, B: Buf, D: Descriptor> Future for WriteAll<'fd, B, D> {
675    type Output = io::Result<()>;
676
677    fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
678        self.inner_poll(ctx).map_ok(|_| ())
679    }
680}
681
682impl<'fd, B: Buf, D: Descriptor> Extract for WriteAll<'fd, B, D> {}
683
684impl<'fd, B: Buf, D: Descriptor> Future for Extractor<WriteAll<'fd, B, D>> {
685    type Output = io::Result<B>;
686
687    fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
688        unsafe { Pin::map_unchecked_mut(self, |s| &mut s.fut) }.inner_poll(ctx)
689    }
690}
691
692/// Wrapper around a buffer `B` to skip a number of bytes.
693#[derive(Debug)]
694pub(crate) struct SkipBuf<B> {
695    pub(crate) buf: B,
696    pub(crate) skip: u32,
697}
698
699unsafe impl<B: Buf> Buf for SkipBuf<B> {
700    unsafe fn parts(&self) -> (*const u8, u32) {
701        let (ptr, size) = self.buf.parts();
702        if self.skip >= size {
703            (ptr, 0)
704        } else {
705            (ptr.add(self.skip as usize), size - self.skip)
706        }
707    }
708}
709
710// WriteVectored.
711op_future! {
712    fn AsyncFd::write_vectored -> usize,
713    struct WriteVectored<'fd, B: BufSlice<N>; const N: usize> {
714        /// Buffers to read from, needs to stay in memory so the kernel can
715        /// access it safely.
716        bufs: B,
717        /// Buffer references used by the kernel.
718        ///
719        /// NOTE: we only need these iovecs in the submission, we don't have to
720        /// keep around during the operation. Because of this we don't heap
721        /// allocate it like we for other operations. This leaves a small
722        /// duration between the submission of the entry and the submission
723        /// being read by the kernel in which this future could be dropped and
724        /// the kernel will read memory we don't own. However because we wake
725        /// the kernel after submitting the timeout entry it's not really worth
726        /// to heap allocation.
727        iovecs: [libc::iovec; N],
728    },
729    drop_using: Box,
730    /// `iovecs` can't move until the kernel has read the submission.
731    impl !Unpin,
732    setup_state: offset: u64,
733    setup: |submission, fd, (_bufs, iovecs), offset| unsafe {
734        submission.write_vectored_at(fd.fd(), iovecs, offset);
735    },
736    map_result: |n| {
737        #[allow(clippy::cast_sign_loss)] // Negative values are mapped to errors.
738        Ok(n as usize)
739    },
740    extract: |this, (buf, _iovecs), n| -> (B, usize) {
741        #[allow(clippy::cast_sign_loss)] // Negative values are mapped to errors.
742        Ok((buf, n as usize))
743    },
744}
745
746/// [`Future`] behind [`AsyncFd::write_all_vectored`].
747#[derive(Debug)]
748pub struct WriteAllVectored<'fd, B, const N: usize, D: Descriptor = File> {
749    write: Extractor<WriteVectored<'fd, B, N, D>>,
750    offset: u64,
751    skip: u64,
752}
753
754impl<'fd, B: BufSlice<N>, const N: usize, D: Descriptor> WriteAllVectored<'fd, B, N, D> {
755    fn new(fd: &'fd AsyncFd<D>, buf: B, offset: u64) -> WriteAllVectored<'fd, B, N, D> {
756        WriteAllVectored {
757            write: fd.write_vectored_at(buf, offset).extract(),
758            offset,
759            skip: 0,
760        }
761    }
762
763    /// Poll implementation used by the [`Future`] implement for the naked type
764    /// and the type wrapper in an [`Extractor`].
765    fn inner_poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<io::Result<B>> {
766        // SAFETY: not moving `Future`.
767        let this = unsafe { Pin::into_inner_unchecked(self) };
768        let mut write = unsafe { Pin::new_unchecked(&mut this.write) };
769        match write.as_mut().poll(ctx) {
770            Poll::Ready(Ok((_, 0))) => Poll::Ready(Err(io::ErrorKind::WriteZero.into())),
771            Poll::Ready(Ok((bufs, n))) => {
772                this.skip += n as u64;
773                if this.offset != NO_OFFSET {
774                    this.offset += n as u64;
775                }
776
777                let mut iovecs = unsafe { bufs.as_iovecs() };
778                let mut skip = this.skip;
779                for iovec in &mut iovecs {
780                    if iovec.iov_len as u64 <= skip {
781                        // Skip entire buf.
782                        skip -= iovec.iov_len as u64;
783                        iovec.iov_len = 0;
784                    } else {
785                        iovec.iov_len -= skip as usize;
786                        break;
787                    }
788                }
789
790                if iovecs[N - 1].iov_len == 0 {
791                    // Written everything.
792                    return Poll::Ready(Ok(bufs));
793                }
794
795                write.set(WriteVectored::new(write.fut.fd, bufs, iovecs, this.offset).extract());
796                unsafe { Pin::new_unchecked(this) }.inner_poll(ctx)
797            }
798            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
799            Poll::Pending => Poll::Pending,
800        }
801    }
802}
803
804impl<'fd, B, const N: usize, D: Descriptor> Cancel for WriteAllVectored<'fd, B, N, D> {
805    fn try_cancel(&mut self) -> CancelResult {
806        self.write.try_cancel()
807    }
808
809    fn cancel(&mut self) -> CancelOp {
810        self.write.cancel()
811    }
812}
813
814impl<'fd, B: BufSlice<N>, const N: usize, D: Descriptor> Future for WriteAllVectored<'fd, B, N, D> {
815    type Output = io::Result<()>;
816
817    fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
818        self.inner_poll(ctx).map_ok(|_| ())
819    }
820}
821
822impl<'fd, B: BufSlice<N>, const N: usize, D: Descriptor> Extract
823    for WriteAllVectored<'fd, B, N, D>
824{
825}
826
827impl<'fd, B: BufSlice<N>, const N: usize, D: Descriptor> Future
828    for Extractor<WriteAllVectored<'fd, B, N, D>>
829{
830    type Output = io::Result<B>;
831
832    fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
833        unsafe { Pin::map_unchecked_mut(self, |s| &mut s.fut) }.inner_poll(ctx)
834    }
835}
836
837// Splice.
838op_future! {
839    fn AsyncFd::splice_to -> usize,
840    struct Splice<'fd> {
841        // Doesn't need any fields.
842    },
843    setup_state: flags: (RawFd, SpliceDirection, u64, u64, u32, libc::c_int),
844    setup: |submission, fd, (), (target, direction, off_in, off_out, len, flags)| unsafe {
845        let (fd_in, fd_out) = match direction {
846            SpliceDirection::To => (fd.fd(), target),
847            SpliceDirection::From => (target, fd.fd()),
848        };
849        submission.splice(fd_in, off_in, fd_out, off_out, len, flags);
850    },
851    map_result: |n| {
852        #[allow(clippy::cast_sign_loss)] // Negative values are mapped to errors.
853        Ok(n as usize)
854    },
855}
856
857#[derive(Copy, Clone, Debug)]
858enum SpliceDirection {
859    To,
860    From,
861}
862
863/// [`Future`] behind [`AsyncFd::close`].
864#[derive(Debug)]
865#[must_use = "`Future`s do nothing unless polled"]
866pub struct Close<D: Descriptor = File> {
867    sq: SubmissionQueue,
868    state: OpState<RawFd>,
869    kind: PhantomData<D>,
870}
871
872impl<D: Descriptor + Unpin> Future for Close<D> {
873    type Output = io::Result<()>;
874
875    fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
876        let op_index = poll_state!(Close, self.state, self.sq, ctx, |submission, fd| unsafe {
877            submission.close(fd);
878            D::use_flags(submission);
879        });
880
881        match self.sq.poll_op(ctx, op_index) {
882            Poll::Ready(result) => {
883                self.state = OpState::Done;
884                match result {
885                    Ok(_) => Poll::Ready(Ok(())),
886                    Err(err) => Poll::Ready(Err(err)),
887                }
888            }
889            Poll::Pending => Poll::Pending,
890        }
891    }
892}
893
894/// Trait that defines the behaviour of buffers used in reading, which requires
895/// mutable access.
896///
897/// # Safety
898///
899/// Unlike normal buffers the buffer implementations for A10 have additional
900/// requirements.
901///
902/// If the operation (that uses this buffer) is not polled to completion, i.e.
903/// the `Future` is dropped before it returns `Poll::Ready`, the kernel still
904/// has access to the buffer and will still attempt to write into it. This means
905/// that we must delay deallocation in such a way that the kernel will not write
906/// into memory we don't have access to any more. This makes, for example, stack
907/// based buffers unfit to implement `BufMut`. Because we can't delay the
908/// deallocation once its dropped and the kernel will overwrite part of your
909/// stack (where the buffer used to be)!
910pub unsafe trait BufMut: 'static {
911    /// Returns the writable buffer as pointer and length parts.
912    ///
913    /// # Safety
914    ///
915    /// Only initialised bytes may be written to the pointer returned. The
916    /// pointer *may* point to uninitialised bytes, so reading from the pointer
917    /// is UB.
918    ///
919    /// The implementation must ensure that the pointer is valid, i.e. not null
920    /// and pointing to memory owned by the buffer. Furthermore it must ensure
921    /// that the returned length is, in combination with the pointer, valid. In
922    /// other words the memory the pointer and length are pointing to must be a
923    /// valid memory address and owned by the buffer.
924    ///
925    /// Note that the above requirements are only required for implementations
926    /// outside of A10. **This trait is unfit for external use!**
927    ///
928    /// # Why not a slice?
929    ///
930    /// Returning a slice `&[u8]` would prevent us to use unitialised bytes,
931    /// meaning we have to zero the buffer before usage, not ideal for
932    /// performance. So, naturally you would suggest `&[MaybeUninit<u8>]`,
933    /// however that would prevent buffer types with only initialised bytes.
934    /// Returning a slice with `MaybeUninit` to such as type would be unsound as
935    /// it would allow the caller to write unitialised bytes without using
936    /// `unsafe`.
937    ///
938    /// # Notes
939    ///
940    /// Most Rust API use a `usize` for length, but io_uring uses `u32`, hence
941    /// we do also.
942    unsafe fn parts_mut(&mut self) -> (*mut u8, u32);
943
944    /// Mark `n` bytes as initialised.
945    ///
946    /// # Safety
947    ///
948    /// The caller must ensure that `n` bytes, starting at the pointer returned
949    /// by [`BufMut::parts_mut`], are initialised.
950    unsafe fn set_init(&mut self, n: usize);
951
952    /// Buffer group id, or `None` if it's not part of a buffer pool.
953    ///
954    /// Don't implement this.
955    #[doc(hidden)]
956    fn buffer_group(&self) -> Option<BufGroupId> {
957        None
958    }
959
960    /// Mark `n` bytes as initialised in buffer with `idx`.
961    ///
962    /// Don't implement this.
963    #[doc(hidden)]
964    unsafe fn buffer_init(&mut self, idx: BufIdx, n: u32) {
965        debug_assert!(idx.0 == 0);
966        self.set_init(n as usize);
967    }
968}
969
970/// The implementation for `Vec<u8>` only uses the unused capacity, so any bytes
971/// already in the buffer will be untouched.
972// SAFETY: `Vec<u8>` manages the allocation of the bytes, so as long as it's
973// alive, so is the slice of bytes. When the `Vec`tor is leaked the allocation
974// will also be leaked.
975unsafe impl BufMut for Vec<u8> {
976    unsafe fn parts_mut(&mut self) -> (*mut u8, u32) {
977        let slice = self.spare_capacity_mut();
978        (slice.as_mut_ptr().cast(), slice.len() as u32)
979    }
980
981    unsafe fn set_init(&mut self, n: usize) {
982        self.set_len(self.len() + n);
983    }
984}
985
986/// Trait that defines the behaviour of buffers used in reading using vectored
987/// I/O, which requires mutable access.
988///
989/// # Safety
990///
991/// This has the same safety requirements as [`BufMut`], but then for all
992/// buffers used.
993pub unsafe trait BufMutSlice<const N: usize>: 'static {
994    /// Returns the writable buffers as `iovec` structures.
995    ///
996    /// # Safety
997    ///
998    /// This has the same safety requirements as [`BufMut::parts_mut`], but then
999    /// for all buffers used.
1000    unsafe fn as_iovecs_mut(&mut self) -> [libc::iovec; N];
1001
1002    /// Mark `n` bytes as initialised.
1003    ///
1004    /// # Safety
1005    ///
1006    /// The caller must ensure that `n` bytes are initialised in the vectors
1007    /// return by [`BufMutSlice::as_iovecs_mut`].
1008    ///
1009    /// The implementation must ensure that that proper buffer(s) are
1010    /// initialised. For example when this is called with `n = 10` with two
1011    /// buffers of size `8` the implementation should initialise the first
1012    /// buffer with `n = 8` and the second with `n = 10 - 8 = 2`.
1013    unsafe fn set_init(&mut self, n: usize);
1014}
1015
1016// SAFETY: `BufMutSlice` has the same safety requirements as `BufMut` and since
1017// `B` implements `BufMut` it's safe to implement `BufMutSlice` for an array of
1018// `B`.
1019unsafe impl<B: BufMut, const N: usize> BufMutSlice<N> for [B; N] {
1020    unsafe fn as_iovecs_mut(&mut self) -> [libc::iovec; N] {
1021        // TODO: replace with `MaybeUninit::uninit_array` once stable.
1022        // SAFETY: an uninitialised `MaybeUninit` is valid.
1023        let mut iovecs =
1024            unsafe { MaybeUninit::<[MaybeUninit<libc::iovec>; N]>::uninit().assume_init() };
1025        for (buf, iovec) in self.iter_mut().zip(iovecs.iter_mut()) {
1026            debug_assert!(
1027                buf.buffer_group().is_none(),
1028                "can't use a10::ReadBuf as a10::BufMutSlice in vectored I/O"
1029            );
1030            let (ptr, len) = buf.parts_mut();
1031            iovec.write(libc::iovec {
1032                iov_base: ptr.cast(),
1033                iov_len: len as _,
1034            });
1035        }
1036        // TODO: replace with `MaybeUninit::array_assume_init` once stable.
1037        // SAFETY: `MaybeUninit<libc::iovec>` and `iovec` have the same layout
1038        // as guaranteed by `MaybeUninit`.
1039        unsafe { std::mem::transmute_copy(&std::mem::ManuallyDrop::new(iovecs)) }
1040    }
1041
1042    unsafe fn set_init(&mut self, n: usize) {
1043        let mut left = n;
1044        for buf in self {
1045            let (_, len) = buf.parts_mut();
1046            let len = len as usize;
1047            if len < left {
1048                // Fully initialised the buffer.
1049                buf.set_init(len);
1050                left -= len;
1051            } else {
1052                // Partially initialised the buffer.
1053                buf.set_init(left);
1054                return;
1055            }
1056        }
1057        unreachable!(
1058            "called BufMutSlice::set_init({n}), with buffers totaling in {} in size",
1059            n - left
1060        );
1061    }
1062}
1063
1064// NOTE: Also see implementation of `BufMutSlice` for tuples in the macro
1065// `buf_slice_for_tuple` below.
1066
1067/// Trait that defines the behaviour of buffers used in writing, which requires
1068/// read only access.
1069///
1070/// # Safety
1071///
1072/// Unlike normal buffers the buffer implementations for A10 have additional
1073/// requirements.
1074///
1075/// If the operation (that uses this buffer) is not polled to completion, i.e.
1076/// the `Future` is dropped before it returns `Poll::Ready`, the kernel still
1077/// has access to the buffer and will still attempt to read from it. This means
1078/// that we must delay deallocation in such a way that the kernel will not read
1079/// memory we don't have access to any more. This makes, for example, stack
1080/// based buffers unfit to implement `Buf`.  Because we can't delay the
1081/// deallocation once its dropped and the kernel will read part of your stack
1082/// (where the buffer used to be)! This would be a huge security risk.
1083pub unsafe trait Buf: 'static {
1084    /// Returns the reabable buffer as pointer and length parts.
1085    ///
1086    /// # Safety
1087    ///
1088    /// The implementation must ensure that the pointer is valid, i.e. not null
1089    /// and pointing to memory owned by the buffer. Furthermore it must ensure
1090    /// that the returned length is, in combination with the pointer, valid. In
1091    /// other words the memory the pointer and length are pointing to must be a
1092    /// valid memory address and owned by the buffer.
1093    ///
1094    /// # Notes
1095    ///
1096    /// Most Rust API use a `usize` for length, but io_uring uses `u32`, hence
1097    /// we do also.
1098    unsafe fn parts(&self) -> (*const u8, u32);
1099}
1100
1101// SAFETY: `Vec<u8>` manages the allocation of the bytes, so as long as it's
1102// alive, so is the slice of bytes. When the `Vec`tor is leaked the allocation
1103// will also be leaked.
1104unsafe impl Buf for Vec<u8> {
1105    unsafe fn parts(&self) -> (*const u8, u32) {
1106        let slice = self.as_slice();
1107        (slice.as_ptr().cast(), slice.len() as u32)
1108    }
1109}
1110
1111// SAFETY: `Box<[u8]>` manages the allocation of the bytes, so as long as it's
1112// alive, so is the slice of bytes. When the `Box` is leaked the allocation will
1113// also be leaked.
1114unsafe impl Buf for Box<[u8]> {
1115    unsafe fn parts(&self) -> (*const u8, u32) {
1116        (self.as_ptr().cast(), self.len() as u32)
1117    }
1118}
1119
1120// SAFETY: `String` is just a `Vec<u8>`, see it's implementation for the safety
1121// reasoning.
1122unsafe impl Buf for String {
1123    unsafe fn parts(&self) -> (*const u8, u32) {
1124        let slice = self.as_bytes();
1125        (slice.as_ptr().cast(), slice.len() as u32)
1126    }
1127}
1128
1129// SAFETY: because the reference has a `'static` lifetime we know the bytes
1130// can't be deallocated, so it's safe to implement `Buf`.
1131unsafe impl Buf for &'static [u8] {
1132    unsafe fn parts(&self) -> (*const u8, u32) {
1133        (self.as_ptr(), self.len() as u32)
1134    }
1135}
1136
1137// SAFETY: because the reference has a `'static` lifetime we know the bytes
1138// can't be deallocated, so it's safe to implement `Buf`.
1139unsafe impl Buf for &'static str {
1140    unsafe fn parts(&self) -> (*const u8, u32) {
1141        (self.as_bytes().as_ptr(), self.len() as u32)
1142    }
1143}
1144
1145/// Trait that defines the behaviour of buffers used in writing using vectored
1146/// I/O, which requires read only access.
1147///
1148/// # Safety
1149///
1150/// This has the same safety requirements as [`Buf`], but then for all buffers
1151/// used.
1152pub unsafe trait BufSlice<const N: usize>: 'static {
1153    /// Returns the reabable buffer as `iovec` structures.
1154    ///
1155    /// # Safety
1156    ///
1157    /// This has the same safety requirements as [`Buf::parts`], but then for
1158    /// all buffers used.
1159    unsafe fn as_iovecs(&self) -> [libc::iovec; N];
1160}
1161
1162// SAFETY: `BufSlice` has the same safety requirements as `Buf` and since `B`
1163// implements `Buf` it's safe to implement `BufSlice` for an array of `B`.
1164unsafe impl<B: Buf, const N: usize> BufSlice<N> for [B; N] {
1165    unsafe fn as_iovecs(&self) -> [libc::iovec; N] {
1166        // TODO: replace with `MaybeUninit::uninit_array` once stable.
1167        // SAFETY: an uninitialised `MaybeUninit` is valid.
1168        let mut iovecs =
1169            unsafe { MaybeUninit::<[MaybeUninit<libc::iovec>; N]>::uninit().assume_init() };
1170        for (buf, iovec) in self.iter().zip(iovecs.iter_mut()) {
1171            let (ptr, len) = buf.parts();
1172            iovec.write(libc::iovec {
1173                iov_base: ptr as _,
1174                iov_len: len as _,
1175            });
1176        }
1177        // TODO: replace with `MaybeUninit::array_assume_init` once stable.
1178        // SAFETY: `MaybeUninit<libc::iovec>` and `iovec` have the same layout
1179        // as guaranteed by `MaybeUninit`.
1180        unsafe { std::mem::transmute_copy(&std::mem::ManuallyDrop::new(iovecs)) }
1181    }
1182}
1183
1184macro_rules! buf_slice_for_tuple {
1185    (
1186        // Number of values.
1187        $N: expr,
1188        // Generic parameter name and tuple index.
1189        $( $generic: ident . $index: tt ),+
1190    ) => {
1191        // SAFETY: `BufMutSlice` has the same safety requirements as `BufMut`
1192        // and since all generic buffers must implement `BufMut` it's safe to
1193        // implement `BufMutSlice` for a tuple of all those buffers.
1194        unsafe impl<$( $generic: BufMut ),+> BufMutSlice<$N> for ($( $generic ),+) {
1195            unsafe fn as_iovecs_mut(&mut self) -> [libc::iovec; $N] {
1196                [
1197                    $({
1198                        debug_assert!(
1199                            self.$index.buffer_group().is_none(),
1200                            "can't use a10::ReadBuf as a10::BufMutSlice in vectored I/O"
1201                        );
1202                        let (ptr, len) = self.$index.parts_mut();
1203                        libc::iovec {
1204                            iov_base: ptr.cast(),
1205                            iov_len: len as _,
1206                        }
1207                    }),+
1208                ]
1209            }
1210
1211            unsafe fn set_init(&mut self, n: usize) {
1212                let mut left = n;
1213                $({
1214                    let (_, len) = self.$index.parts_mut();
1215                    let len = len as usize;
1216                    if len < left {
1217                        // Fully initialised the buffer.
1218                        self.$index.set_init(len);
1219                        left -= len;
1220                    } else {
1221                        // Partially initialised the buffer.
1222                        self.$index.set_init(left);
1223                        return;
1224                    }
1225                })+
1226                unreachable!(
1227                    "called BufMutSlice::set_init({n}), with buffers totaling in {} in size",
1228                    n - left
1229                );
1230            }
1231        }
1232
1233        // SAFETY: `BufSlice` has the same safety requirements as `Buf` and
1234        // since all generic buffers must implement `Buf` it's safe to implement
1235        // `BufSlice` for a tuple of all those buffers.
1236        unsafe impl<$( $generic: Buf ),+> BufSlice<$N> for ($( $generic ),+) {
1237            unsafe fn as_iovecs(&self) -> [libc::iovec; $N] {
1238                [
1239                    $({
1240                        let (ptr, len) = self.$index.parts();
1241                        libc::iovec {
1242                            iov_base: ptr as _,
1243                            iov_len: len as _,
1244                        }
1245                    }),+
1246                ]
1247            }
1248        }
1249    };
1250}
1251
1252buf_slice_for_tuple!(2, A.0, B.1);
1253buf_slice_for_tuple!(3, A.0, B.1, C.2);
1254buf_slice_for_tuple!(4, A.0, B.1, C.2, D.3);
1255buf_slice_for_tuple!(5, A.0, B.1, C.2, D.3, E.4);
1256buf_slice_for_tuple!(6, A.0, B.1, C.2, D.3, E.4, F.5);
1257buf_slice_for_tuple!(7, A.0, B.1, C.2, D.3, E.4, F.5, G.6);
1258buf_slice_for_tuple!(8, A.0, B.1, C.2, D.3, E.4, F.5, G.6, I.7);