compio_driver/
op.rs

1//! The async operations.
2//!
3//! Types in this mod represents the low-level operations passed to kernel.
4//! The operation itself doesn't perform anything.
5//! You need to pass them to [`crate::Proactor`], and poll the driver.
6
7use std::{io, marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, SetBufInit};
10use socket2::SockAddr;
11
12#[cfg(windows)]
13pub use crate::sys::op::{ConnectNamedPipe, DeviceIoControl};
14pub use crate::sys::op::{
15    Accept, Recv, RecvFrom, RecvFromVectored, RecvMsg, RecvVectored, Send, SendMsg, SendTo,
16    SendToVectored, SendVectored,
17};
18#[cfg(unix)]
19pub use crate::sys::op::{
20    CreateDir, CreateSocket, FileStat, HardLink, Interest, OpenFile, PathStat, PollOnce,
21    ReadVectoredAt, Rename, Symlink, Unlink, WriteVectoredAt,
22};
23#[cfg(io_uring)]
24pub use crate::sys::op::{ReadManagedAt, RecvManaged};
25use crate::{
26    OwnedFd, TakeBuffer,
27    sys::{sockaddr_storage, socklen_t},
28};
29
30/// Trait to update the buffer length inside the [`BufResult`].
31pub trait BufResultExt {
32    /// Call [`SetBufInit::set_buf_init`] if the result is [`Ok`].
33    fn map_advanced(self) -> Self;
34}
35
36impl<T: SetBufInit> BufResultExt for BufResult<usize, T> {
37    fn map_advanced(self) -> Self {
38        self.map_res(|res| (res, ()))
39            .map_advanced()
40            .map_res(|(res, _)| res)
41    }
42}
43
44impl<T: SetBufInit, O> BufResultExt for BufResult<(usize, O), T> {
45    fn map_advanced(self) -> Self {
46        self.map(|(init, obj), mut buffer| {
47            unsafe {
48                buffer.set_buf_init(init);
49            }
50            ((init, obj), buffer)
51        })
52    }
53}
54
55impl<T: SetBufInit, C: SetBufInit, O> BufResultExt for BufResult<(usize, usize, O), (T, C)> {
56    fn map_advanced(self) -> Self {
57        self.map(
58            |(init_buffer, init_control, obj), (mut buffer, mut control)| {
59                unsafe {
60                    buffer.set_buf_init(init_buffer);
61                    control.set_buf_init(init_control);
62                }
63                ((init_buffer, init_control, obj), (buffer, control))
64            },
65        )
66    }
67}
68
69/// Helper trait for [`RecvFrom`], [`RecvFromVectored`] and [`RecvMsg`].
70pub trait RecvResultExt {
71    /// The mapped result.
72    type RecvResult;
73
74    /// Create [`SockAddr`] if the result is [`Ok`].
75    fn map_addr(self) -> Self::RecvResult;
76}
77
78impl<T> RecvResultExt for BufResult<usize, (T, sockaddr_storage, socklen_t)> {
79    type RecvResult = BufResult<(usize, SockAddr), T>;
80
81    fn map_addr(self) -> Self::RecvResult {
82        self.map_buffer(|(buffer, addr_buffer, addr_size)| (buffer, addr_buffer, addr_size, 0))
83            .map_addr()
84            .map_res(|(res, _, addr)| (res, addr))
85    }
86}
87
88impl<T> RecvResultExt for BufResult<usize, (T, sockaddr_storage, socklen_t, usize)> {
89    type RecvResult = BufResult<(usize, usize, SockAddr), T>;
90
91    fn map_addr(self) -> Self::RecvResult {
92        self.map2(
93            |res, (buffer, addr_buffer, addr_size, len)| {
94                let addr = unsafe { SockAddr::new(addr_buffer, addr_size) };
95                ((res, len, addr), buffer)
96            },
97            |(buffer, ..)| buffer,
98        )
99    }
100}
101
102/// Helper trait for [`ReadManagedAt`] and [`RecvManaged`].
103pub trait ResultTakeBuffer {
104    /// The buffer pool of the op.
105    type BufferPool;
106    /// The buffer type of the op.
107    type Buffer<'a>;
108
109    /// Take the buffer from result.
110    fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>>;
111}
112
113impl<T: TakeBuffer> ResultTakeBuffer for (BufResult<usize, T>, u32) {
114    type Buffer<'a> = T::Buffer<'a>;
115    type BufferPool = T::BufferPool;
116
117    fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>> {
118        let (BufResult(result, op), flags) = self;
119        op.take_buffer(pool, result, flags)
120    }
121}
122
123/// Spawn a blocking function in the thread pool.
124pub struct Asyncify<F, D> {
125    pub(crate) f: Option<F>,
126    pub(crate) data: Option<D>,
127    _p: PhantomPinned,
128}
129
130impl<F, D> Asyncify<F, D> {
131    /// Create [`Asyncify`].
132    pub fn new(f: F) -> Self {
133        Self {
134            f: Some(f),
135            data: None,
136            _p: PhantomPinned,
137        }
138    }
139}
140
141impl<F, D> IntoInner for Asyncify<F, D> {
142    type Inner = D;
143
144    fn into_inner(mut self) -> Self::Inner {
145        self.data.take().expect("the data should not be None")
146    }
147}
148
149/// Close the file fd.
150pub struct CloseFile {
151    pub(crate) fd: ManuallyDrop<OwnedFd>,
152}
153
154impl CloseFile {
155    /// Create [`CloseFile`].
156    pub fn new(fd: OwnedFd) -> Self {
157        Self {
158            fd: ManuallyDrop::new(fd),
159        }
160    }
161}
162
163/// Read a file at specified position into specified buffer.
164#[derive(Debug)]
165pub struct ReadAt<T: IoBufMut, S> {
166    pub(crate) fd: S,
167    pub(crate) offset: u64,
168    pub(crate) buffer: T,
169    #[cfg(aio)]
170    pub(crate) aiocb: libc::aiocb,
171    _p: PhantomPinned,
172}
173
174impl<T: IoBufMut, S> ReadAt<T, S> {
175    /// Create [`ReadAt`].
176    pub fn new(fd: S, offset: u64, buffer: T) -> Self {
177        Self {
178            fd,
179            offset,
180            buffer,
181            #[cfg(aio)]
182            aiocb: unsafe { std::mem::zeroed() },
183            _p: PhantomPinned,
184        }
185    }
186}
187
188impl<T: IoBufMut, S> IntoInner for ReadAt<T, S> {
189    type Inner = T;
190
191    fn into_inner(self) -> Self::Inner {
192        self.buffer
193    }
194}
195
196/// Write a file at specified position from specified buffer.
197#[derive(Debug)]
198pub struct WriteAt<T: IoBuf, S> {
199    pub(crate) fd: S,
200    pub(crate) offset: u64,
201    pub(crate) buffer: T,
202    #[cfg(aio)]
203    pub(crate) aiocb: libc::aiocb,
204    _p: PhantomPinned,
205}
206
207impl<T: IoBuf, S> WriteAt<T, S> {
208    /// Create [`WriteAt`].
209    pub fn new(fd: S, offset: u64, buffer: T) -> Self {
210        Self {
211            fd,
212            offset,
213            buffer,
214            #[cfg(aio)]
215            aiocb: unsafe { std::mem::zeroed() },
216            _p: PhantomPinned,
217        }
218    }
219}
220
221impl<T: IoBuf, S> IntoInner for WriteAt<T, S> {
222    type Inner = T;
223
224    fn into_inner(self) -> Self::Inner {
225        self.buffer
226    }
227}
228
229/// Sync data to the disk.
230pub struct Sync<S> {
231    pub(crate) fd: S,
232    #[allow(dead_code)]
233    pub(crate) datasync: bool,
234    #[cfg(aio)]
235    pub(crate) aiocb: libc::aiocb,
236}
237
238impl<S> Sync<S> {
239    /// Create [`Sync`].
240    ///
241    /// If `datasync` is `true`, the file metadata may not be synchronized.
242    pub fn new(fd: S, datasync: bool) -> Self {
243        Self {
244            fd,
245            datasync,
246            #[cfg(aio)]
247            aiocb: unsafe { std::mem::zeroed() },
248        }
249    }
250}
251
252/// Shutdown a socket.
253pub struct ShutdownSocket<S> {
254    pub(crate) fd: S,
255    pub(crate) how: Shutdown,
256}
257
258impl<S> ShutdownSocket<S> {
259    /// Create [`ShutdownSocket`].
260    pub fn new(fd: S, how: Shutdown) -> Self {
261        Self { fd, how }
262    }
263}
264
265/// Close socket fd.
266pub struct CloseSocket {
267    pub(crate) fd: ManuallyDrop<OwnedFd>,
268}
269
270impl CloseSocket {
271    /// Create [`CloseSocket`].
272    pub fn new(fd: OwnedFd) -> Self {
273        Self {
274            fd: ManuallyDrop::new(fd),
275        }
276    }
277}
278
279/// Connect to a remote address.
280pub struct Connect<S> {
281    pub(crate) fd: S,
282    pub(crate) addr: SockAddr,
283}
284
285impl<S> Connect<S> {
286    /// Create [`Connect`]. `fd` should be bound.
287    pub fn new(fd: S, addr: SockAddr) -> Self {
288        Self { fd, addr }
289    }
290}
291
292#[cfg(any(not(io_uring), fusion))]
293pub(crate) mod managed {
294    use std::io;
295
296    use compio_buf::IntoInner;
297
298    use super::{ReadAt, Recv};
299    use crate::{BorrowedBuffer, BufferPool, OwnedBuffer, TakeBuffer};
300
301    /// Read a file at specified position into managed buffer.
302    pub struct ReadManagedAt<S> {
303        pub(crate) op: ReadAt<OwnedBuffer, S>,
304    }
305
306    impl<S> ReadManagedAt<S> {
307        /// Create [`ReadManagedAt`].
308        pub fn new(fd: S, offset: u64, pool: &BufferPool, len: usize) -> io::Result<Self> {
309            #[cfg(fusion)]
310            let pool = pool.as_poll();
311            Ok(Self {
312                op: ReadAt::new(fd, offset, pool.get_buffer(len)?),
313            })
314        }
315    }
316
317    impl<S> TakeBuffer for ReadManagedAt<S> {
318        type Buffer<'a> = BorrowedBuffer<'a>;
319        type BufferPool = BufferPool;
320
321        fn take_buffer(
322            self,
323            buffer_pool: &BufferPool,
324            result: io::Result<usize>,
325            _: u32,
326        ) -> io::Result<BorrowedBuffer<'_>> {
327            let result = result?;
328            #[cfg(fusion)]
329            let buffer_pool = buffer_pool.as_poll();
330            let slice = self.op.into_inner();
331            // Safety: result is valid
332            let res = unsafe { buffer_pool.create_proxy(slice, result) };
333            #[cfg(fusion)]
334            let res = BorrowedBuffer::new_poll(res);
335            Ok(res)
336        }
337    }
338
339    /// Receive data from remote into managed buffer.
340    pub struct RecvManaged<S> {
341        pub(crate) op: Recv<OwnedBuffer, S>,
342    }
343
344    impl<S> RecvManaged<S> {
345        /// Create [`RecvManaged`].
346        pub fn new(fd: S, pool: &BufferPool, len: usize) -> io::Result<Self> {
347            #[cfg(fusion)]
348            let pool = pool.as_poll();
349            Ok(Self {
350                op: Recv::new(fd, pool.get_buffer(len)?),
351            })
352        }
353    }
354
355    impl<S> TakeBuffer for RecvManaged<S> {
356        type Buffer<'a> = BorrowedBuffer<'a>;
357        type BufferPool = BufferPool;
358
359        fn take_buffer(
360            self,
361            buffer_pool: &Self::BufferPool,
362            result: io::Result<usize>,
363            _: u32,
364        ) -> io::Result<Self::Buffer<'_>> {
365            let result = result?;
366            #[cfg(fusion)]
367            let buffer_pool = buffer_pool.as_poll();
368            let slice = self.op.into_inner();
369            // Safety: result is valid
370            let res = unsafe { buffer_pool.create_proxy(slice, result) };
371            #[cfg(fusion)]
372            let res = BorrowedBuffer::new_poll(res);
373            Ok(res)
374        }
375    }
376}
377
378#[cfg(not(io_uring))]
379pub use managed::*;