mio_aio/
aio.rs

1// vim: tw=80
2use std::{
3    io::{self, IoSlice, IoSliceMut},
4    os::unix::io::{AsRawFd, BorrowedFd, RawFd},
5    pin::Pin,
6};
7
8use mio::{event, Interest, Registry, Token};
9pub use nix::sys::aio::AioFsyncMode;
10use nix::{
11    libc::off_t,
12    sys::{
13        aio::{self, Aio},
14        event::EventFlag,
15        signal::SigevNotify,
16    },
17};
18
19/// Return type of [`Source::read_at`]
20pub type ReadAt<'a> = Source<aio::AioRead<'a>>;
21/// Return type of [`Source::readv_at`]
22pub type ReadvAt<'a> = Source<aio::AioReadv<'a>>;
23/// Return type of [`Source::fsync`]
24pub type Fsync<'a> = Source<aio::AioFsync<'a>>;
25/// Return type of [`Source::write_at`]
26pub type WriteAt<'a> = Source<aio::AioWrite<'a>>;
27/// Return type of [`Source::writev_at`]
28pub type WritevAt<'a> = Source<aio::AioWritev<'a>>;
29
30/// Common methods supported by all POSIX AIO Mio sources
31pub trait SourceApi {
32    /// Return type of [`SourceApi::aio_return`].
33    type Output;
34
35    /// Read the final result of the operation
36    fn aio_return(self: Pin<&mut Self>) -> nix::Result<Self::Output>;
37
38    /// Ask the operating system to cancel the operation
39    ///
40    /// Most file systems on most operating systems don't actually support
41    /// cancellation; they'll just return `AIO_NOTCANCELED`.
42    fn cancel(self: Pin<&mut Self>) -> nix::Result<aio::AioCancelStat>;
43
44    /// Retrieve the status of an in-progress or complete operation.
45    ///
46    /// Not usually needed, since `mio_aio` always uses kqueue for notification.
47    fn error(self: Pin<&mut Self>) -> nix::Result<()>;
48
49    /// Does this operation currently have any in-kernel state?
50    fn in_progress(&self) -> bool;
51
52    /// Extra registration method needed by Tokio
53    #[cfg(feature = "tokio")]
54    #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
55    fn deregister_raw(&mut self);
56
57    /// Extra registration method needed by Tokio
58    #[cfg(feature = "tokio")]
59    #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
60    fn register_raw(&mut self, kq: RawFd, udata: usize);
61
62    /// Actually start the I/O operation.
63    ///
64    /// After calling this method and until [`SourceApi::aio_return`] returns
65    /// `Ok`, the structure may not be moved in memory.
66    fn submit(self: Pin<&mut Self>) -> nix::Result<()>;
67}
68
69/// A Mio source based on a single POSIX AIO operation.
70///
71/// The generic parameter specifies exactly which operation it is.  This struct
72/// implements `mio::Source`.  After creation, use `mio::Source::register` to
73/// connect it to the event loop.
74#[derive(Debug)]
75pub struct Source<T> {
76    inner: T,
77}
78impl<T: Aio> Source<T> {
79    pin_utils::unsafe_pinned!(inner: T);
80
81    fn _deregister_raw(&mut self) {
82        let sigev = SigevNotify::SigevNone;
83        self.inner.set_sigev_notify(sigev);
84    }
85
86    fn _register_raw(&mut self, kq: RawFd, udata: usize) {
87        let sigev = SigevNotify::SigevKeventFlags {
88            kq,
89            udata: udata as isize,
90            flags: EventFlag::EV_ONESHOT,
91        };
92        self.inner.set_sigev_notify(sigev);
93    }
94}
95
96impl<T: Aio> SourceApi for Source<T> {
97    type Output = T::Output;
98
99    fn aio_return(self: Pin<&mut Self>) -> nix::Result<Self::Output> {
100        self.inner().aio_return()
101    }
102
103    fn cancel(self: Pin<&mut Self>) -> nix::Result<aio::AioCancelStat> {
104        self.inner().cancel()
105    }
106
107    #[cfg(feature = "tokio")]
108    fn deregister_raw(&mut self) {
109        self._deregister_raw()
110    }
111
112    fn error(self: Pin<&mut Self>) -> nix::Result<()> {
113        self.inner().error()
114    }
115
116    fn in_progress(&self) -> bool {
117        self.inner.in_progress()
118    }
119
120    #[cfg(feature = "tokio")]
121    fn register_raw(&mut self, kq: RawFd, udata: usize) {
122        self._register_raw(kq, udata)
123    }
124
125    fn submit(self: Pin<&mut Self>) -> nix::Result<()> {
126        self.inner().submit()
127    }
128}
129
130impl<T: Aio> event::Source for Source<T> {
131    fn register(
132        &mut self,
133        registry: &Registry,
134        token: Token,
135        interests: Interest,
136    ) -> io::Result<()> {
137        assert!(interests.is_aio());
138        let udata = usize::from(token);
139        let kq = registry.as_raw_fd();
140        self._register_raw(kq, udata);
141        Ok(())
142    }
143
144    fn reregister(
145        &mut self,
146        registry: &Registry,
147        token: Token,
148        interests: Interest,
149    ) -> io::Result<()> {
150        self.register(registry, token, interests)
151    }
152
153    fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
154        self._deregister_raw();
155        Ok(())
156    }
157}
158
159impl<'a> Source<aio::AioFsync<'a>> {
160    /// Asynchronously fsync a file.
161    pub fn fsync(fd: BorrowedFd<'a>, mode: AioFsyncMode, prio: i32) -> Self {
162        let inner = aio::AioFsync::new(fd, mode, prio, SigevNotify::SigevNone);
163        Source { inner }
164    }
165}
166
167impl<'a> Source<aio::AioRead<'a>> {
168    /// Asynchronously read from a file.
169    pub fn read_at(
170        fd: BorrowedFd<'a>,
171        offs: u64,
172        buf: &'a mut [u8],
173        prio: i32,
174    ) -> Self {
175        let inner = aio::AioRead::new(
176            fd,
177            offs as off_t,
178            buf,
179            prio,
180            SigevNotify::SigevNone,
181        );
182        Source { inner }
183    }
184}
185
186impl<'a> Source<aio::AioReadv<'a>> {
187    /// Asynchronously read from a file to a scatter/gather list of buffers.
188    ///
189    /// Requires FreeBSD 13.0 or later.
190    pub fn readv_at(
191        fd: BorrowedFd<'a>,
192        offs: u64,
193        bufs: &mut [IoSliceMut<'a>],
194        prio: i32,
195    ) -> Self {
196        let inner = aio::AioReadv::new(
197            fd,
198            offs as off_t,
199            bufs,
200            prio,
201            SigevNotify::SigevNone,
202        );
203        Source { inner }
204    }
205}
206
207impl<'a> Source<aio::AioWrite<'a>> {
208    /// Asynchronously write to a file.
209    pub fn write_at(
210        fd: BorrowedFd<'a>,
211        offs: u64,
212        buf: &'a [u8],
213        prio: i32,
214    ) -> Self {
215        let inner = aio::AioWrite::new(
216            fd,
217            offs as off_t,
218            buf,
219            prio,
220            SigevNotify::SigevNone,
221        );
222        Source { inner }
223    }
224}
225
226impl<'a> Source<aio::AioWritev<'a>> {
227    /// Asynchronously write to a file to a scatter/gather list of buffers.
228    ///
229    /// Requires FreeBSD 13.0 or later.
230    pub fn writev_at(
231        fd: BorrowedFd<'a>,
232        offs: u64,
233        bufs: &[IoSlice<'a>],
234        prio: i32,
235    ) -> Self {
236        let inner = aio::AioWritev::new(
237            fd,
238            offs as off_t,
239            bufs,
240            prio,
241            SigevNotify::SigevNone,
242        );
243        Source { inner }
244    }
245}