compio_driver/
op.rs

1//! The async operations.
2//!
3//! Types in this mod represents the low-level operations passed to kernel.
4//! The operation itself doesn't perform anything.
5//! You need to pass them to [`crate::Proactor`], and poll the driver.
6
7use std::{io, marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, SetBufInit};
10use socket2::{SockAddr, SockAddrStorage, socklen_t};
11
12pub use crate::sys::op::{
13    Accept, Recv, RecvFrom, RecvFromVectored, RecvMsg, RecvVectored, Send, SendMsg, SendTo,
14    SendToVectored, SendVectored,
15};
16#[cfg(windows)]
17pub use crate::sys::op::{ConnectNamedPipe, DeviceIoControl};
18#[cfg(unix)]
19pub use crate::sys::op::{
20    CreateDir, CreateSocket, FileStat, HardLink, Interest, OpenFile, PathStat, PollOnce,
21    ReadVectoredAt, Rename, Symlink, Unlink, WriteVectoredAt,
22};
23#[cfg(io_uring)]
24pub use crate::sys::op::{ReadManagedAt, RecvManaged};
25use crate::{OwnedFd, TakeBuffer};
26
27/// Trait to update the buffer length inside the [`BufResult`].
28pub trait BufResultExt {
29    /// Call [`SetBufInit::set_buf_init`] if the result is [`Ok`].
30    fn map_advanced(self) -> Self;
31}
32
33impl<T: SetBufInit> BufResultExt for BufResult<usize, T> {
34    fn map_advanced(self) -> Self {
35        self.map_res(|res| (res, ()))
36            .map_advanced()
37            .map_res(|(res, _)| res)
38    }
39}
40
41impl<T: SetBufInit, O> BufResultExt for BufResult<(usize, O), T> {
42    fn map_advanced(self) -> Self {
43        self.map(|(init, obj), mut buffer| {
44            unsafe {
45                buffer.set_buf_init(init);
46            }
47            ((init, obj), buffer)
48        })
49    }
50}
51
52impl<T: SetBufInit, C: SetBufInit, O> BufResultExt for BufResult<(usize, usize, O), (T, C)> {
53    fn map_advanced(self) -> Self {
54        self.map(
55            |(init_buffer, init_control, obj), (mut buffer, mut control)| {
56                unsafe {
57                    buffer.set_buf_init(init_buffer);
58                    control.set_buf_init(init_control);
59                }
60                ((init_buffer, init_control, obj), (buffer, control))
61            },
62        )
63    }
64}
65
66/// Helper trait for [`RecvFrom`], [`RecvFromVectored`] and [`RecvMsg`].
67pub trait RecvResultExt {
68    /// The mapped result.
69    type RecvResult;
70
71    /// Create [`SockAddr`] if the result is [`Ok`].
72    fn map_addr(self) -> Self::RecvResult;
73}
74
75impl<T> RecvResultExt for BufResult<usize, (T, SockAddrStorage, socklen_t)> {
76    type RecvResult = BufResult<(usize, SockAddr), T>;
77
78    fn map_addr(self) -> Self::RecvResult {
79        self.map_buffer(|(buffer, addr_buffer, addr_size)| (buffer, addr_buffer, addr_size, 0))
80            .map_addr()
81            .map_res(|(res, _, addr)| (res, addr))
82    }
83}
84
85impl<T> RecvResultExt for BufResult<usize, (T, SockAddrStorage, socklen_t, usize)> {
86    type RecvResult = BufResult<(usize, usize, SockAddr), T>;
87
88    fn map_addr(self) -> Self::RecvResult {
89        self.map2(
90            |res, (buffer, addr_buffer, addr_size, len)| {
91                let addr = unsafe { SockAddr::new(addr_buffer, addr_size) };
92                ((res, len, addr), buffer)
93            },
94            |(buffer, ..)| buffer,
95        )
96    }
97}
98
99/// Helper trait for [`ReadManagedAt`] and [`RecvManaged`].
100pub trait ResultTakeBuffer {
101    /// The buffer pool of the op.
102    type BufferPool;
103    /// The buffer type of the op.
104    type Buffer<'a>;
105
106    /// Take the buffer from result.
107    fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>>;
108}
109
110impl<T: TakeBuffer> ResultTakeBuffer for (BufResult<usize, T>, u32) {
111    type Buffer<'a> = T::Buffer<'a>;
112    type BufferPool = T::BufferPool;
113
114    fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>> {
115        let (BufResult(result, op), flags) = self;
116        op.take_buffer(pool, result, flags)
117    }
118}
119
120/// Spawn a blocking function in the thread pool.
121pub struct Asyncify<F, D> {
122    pub(crate) f: Option<F>,
123    pub(crate) data: Option<D>,
124    _p: PhantomPinned,
125}
126
127impl<F, D> Asyncify<F, D> {
128    /// Create [`Asyncify`].
129    pub fn new(f: F) -> Self {
130        Self {
131            f: Some(f),
132            data: None,
133            _p: PhantomPinned,
134        }
135    }
136}
137
138impl<F, D> IntoInner for Asyncify<F, D> {
139    type Inner = D;
140
141    fn into_inner(mut self) -> Self::Inner {
142        self.data.take().expect("the data should not be None")
143    }
144}
145
146/// Close the file fd.
147pub struct CloseFile {
148    pub(crate) fd: ManuallyDrop<OwnedFd>,
149}
150
151impl CloseFile {
152    /// Create [`CloseFile`].
153    pub fn new(fd: OwnedFd) -> Self {
154        Self {
155            fd: ManuallyDrop::new(fd),
156        }
157    }
158}
159
160/// Read a file at specified position into specified buffer.
161#[derive(Debug)]
162pub struct ReadAt<T: IoBufMut, S> {
163    pub(crate) fd: S,
164    pub(crate) offset: u64,
165    pub(crate) buffer: T,
166    #[cfg(aio)]
167    pub(crate) aiocb: libc::aiocb,
168    _p: PhantomPinned,
169}
170
171impl<T: IoBufMut, S> ReadAt<T, S> {
172    /// Create [`ReadAt`].
173    pub fn new(fd: S, offset: u64, buffer: T) -> Self {
174        Self {
175            fd,
176            offset,
177            buffer,
178            #[cfg(aio)]
179            aiocb: unsafe { std::mem::zeroed() },
180            _p: PhantomPinned,
181        }
182    }
183}
184
185impl<T: IoBufMut, S> IntoInner for ReadAt<T, S> {
186    type Inner = T;
187
188    fn into_inner(self) -> Self::Inner {
189        self.buffer
190    }
191}
192
193/// Write a file at specified position from specified buffer.
194#[derive(Debug)]
195pub struct WriteAt<T: IoBuf, S> {
196    pub(crate) fd: S,
197    pub(crate) offset: u64,
198    pub(crate) buffer: T,
199    #[cfg(aio)]
200    pub(crate) aiocb: libc::aiocb,
201    _p: PhantomPinned,
202}
203
204impl<T: IoBuf, S> WriteAt<T, S> {
205    /// Create [`WriteAt`].
206    pub fn new(fd: S, offset: u64, buffer: T) -> Self {
207        Self {
208            fd,
209            offset,
210            buffer,
211            #[cfg(aio)]
212            aiocb: unsafe { std::mem::zeroed() },
213            _p: PhantomPinned,
214        }
215    }
216}
217
218impl<T: IoBuf, S> IntoInner for WriteAt<T, S> {
219    type Inner = T;
220
221    fn into_inner(self) -> Self::Inner {
222        self.buffer
223    }
224}
225
226/// Sync data to the disk.
227pub struct Sync<S> {
228    pub(crate) fd: S,
229    #[allow(dead_code)]
230    pub(crate) datasync: bool,
231    #[cfg(aio)]
232    pub(crate) aiocb: libc::aiocb,
233}
234
235impl<S> Sync<S> {
236    /// Create [`Sync`].
237    ///
238    /// If `datasync` is `true`, the file metadata may not be synchronized.
239    pub fn new(fd: S, datasync: bool) -> Self {
240        Self {
241            fd,
242            datasync,
243            #[cfg(aio)]
244            aiocb: unsafe { std::mem::zeroed() },
245        }
246    }
247}
248
249/// Shutdown a socket.
250pub struct ShutdownSocket<S> {
251    pub(crate) fd: S,
252    pub(crate) how: Shutdown,
253}
254
255impl<S> ShutdownSocket<S> {
256    /// Create [`ShutdownSocket`].
257    pub fn new(fd: S, how: Shutdown) -> Self {
258        Self { fd, how }
259    }
260}
261
262/// Close socket fd.
263pub struct CloseSocket {
264    pub(crate) fd: ManuallyDrop<OwnedFd>,
265}
266
267impl CloseSocket {
268    /// Create [`CloseSocket`].
269    pub fn new(fd: OwnedFd) -> Self {
270        Self {
271            fd: ManuallyDrop::new(fd),
272        }
273    }
274}
275
276/// Connect to a remote address.
277pub struct Connect<S> {
278    pub(crate) fd: S,
279    pub(crate) addr: SockAddr,
280}
281
282impl<S> Connect<S> {
283    /// Create [`Connect`]. `fd` should be bound.
284    pub fn new(fd: S, addr: SockAddr) -> Self {
285        Self { fd, addr }
286    }
287}
288
289#[cfg(any(not(io_uring), fusion))]
290pub(crate) mod managed {
291    use std::io;
292
293    use compio_buf::IntoInner;
294
295    use super::{ReadAt, Recv};
296    use crate::{BorrowedBuffer, BufferPool, OwnedBuffer, TakeBuffer};
297
298    /// Read a file at specified position into managed buffer.
299    pub struct ReadManagedAt<S> {
300        pub(crate) op: ReadAt<OwnedBuffer, S>,
301    }
302
303    impl<S> ReadManagedAt<S> {
304        /// Create [`ReadManagedAt`].
305        pub fn new(fd: S, offset: u64, pool: &BufferPool, len: usize) -> io::Result<Self> {
306            #[cfg(fusion)]
307            let pool = pool.as_poll();
308            Ok(Self {
309                op: ReadAt::new(fd, offset, pool.get_buffer(len)?),
310            })
311        }
312    }
313
314    impl<S> TakeBuffer for ReadManagedAt<S> {
315        type Buffer<'a> = BorrowedBuffer<'a>;
316        type BufferPool = BufferPool;
317
318        fn take_buffer(
319            self,
320            buffer_pool: &BufferPool,
321            result: io::Result<usize>,
322            _: u32,
323        ) -> io::Result<BorrowedBuffer<'_>> {
324            let result = result?;
325            #[cfg(fusion)]
326            let buffer_pool = buffer_pool.as_poll();
327            let slice = self.op.into_inner();
328            // Safety: result is valid
329            let res = unsafe { buffer_pool.create_proxy(slice, result) };
330            #[cfg(fusion)]
331            let res = BorrowedBuffer::new_poll(res);
332            Ok(res)
333        }
334    }
335
336    /// Receive data from remote into managed buffer.
337    pub struct RecvManaged<S> {
338        pub(crate) op: Recv<OwnedBuffer, S>,
339    }
340
341    impl<S> RecvManaged<S> {
342        /// Create [`RecvManaged`].
343        pub fn new(fd: S, pool: &BufferPool, len: usize) -> io::Result<Self> {
344            #[cfg(fusion)]
345            let pool = pool.as_poll();
346            Ok(Self {
347                op: Recv::new(fd, pool.get_buffer(len)?),
348            })
349        }
350    }
351
352    impl<S> TakeBuffer for RecvManaged<S> {
353        type Buffer<'a> = BorrowedBuffer<'a>;
354        type BufferPool = BufferPool;
355
356        fn take_buffer(
357            self,
358            buffer_pool: &Self::BufferPool,
359            result: io::Result<usize>,
360            _: u32,
361        ) -> io::Result<Self::Buffer<'_>> {
362            let result = result?;
363            #[cfg(fusion)]
364            let buffer_pool = buffer_pool.as_poll();
365            let slice = self.op.into_inner();
366            // Safety: result is valid
367            let res = unsafe { buffer_pool.create_proxy(slice, result) };
368            #[cfg(fusion)]
369            let res = BorrowedBuffer::new_poll(res);
370            Ok(res)
371        }
372    }
373}
374
375#[cfg(not(io_uring))]
376pub use managed::*;