compio_driver/
op.rs

1//! The async operations.
2//!
3//! Types in this mod represents the low-level operations passed to kernel.
4//! The operation itself doesn't perform anything.
5//! You need to pass them to [`crate::Proactor`], and poll the driver.
6
7use std::{io, marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, SetBufInit};
10use socket2::{SockAddr, SockAddrStorage, socklen_t};
11
12pub use crate::sys::op::{
13    Accept, Recv, RecvFrom, RecvFromVectored, RecvMsg, RecvVectored, Send, SendMsg, SendTo,
14    SendToVectored, SendVectored,
15};
16#[cfg(windows)]
17pub use crate::sys::op::{ConnectNamedPipe, DeviceIoControl};
18#[cfg(unix)]
19pub use crate::sys::op::{
20    CreateDir, CreateSocket, FileStat, HardLink, Interest, OpenFile, PathStat, PollOnce,
21    ReadVectoredAt, Rename, Symlink, Unlink, WriteVectoredAt,
22};
23#[cfg(io_uring)]
24pub use crate::sys::op::{ReadManagedAt, RecvManaged};
25use crate::{OwnedFd, SharedFd, TakeBuffer};
26
27/// Trait to update the buffer length inside the [`BufResult`].
28pub trait BufResultExt {
29    /// Call [`SetBufInit::set_buf_init`] if the result is [`Ok`].
30    fn map_advanced(self) -> Self;
31}
32
33impl<T: SetBufInit> BufResultExt for BufResult<usize, T> {
34    fn map_advanced(self) -> Self {
35        self.map_res(|res| (res, ()))
36            .map_advanced()
37            .map_res(|(res, _)| res)
38    }
39}
40
41impl<T: SetBufInit, O> BufResultExt for BufResult<(usize, O), T> {
42    fn map_advanced(self) -> Self {
43        self.map(|(init, obj), mut buffer| {
44            unsafe {
45                buffer.set_buf_init(init);
46            }
47            ((init, obj), buffer)
48        })
49    }
50}
51
52impl<T: SetBufInit, C: SetBufInit, O> BufResultExt for BufResult<(usize, usize, O), (T, C)> {
53    fn map_advanced(self) -> Self {
54        self.map(
55            |(init_buffer, init_control, obj), (mut buffer, mut control)| {
56                unsafe {
57                    buffer.set_buf_init(init_buffer);
58                    control.set_buf_init(init_control);
59                }
60                ((init_buffer, init_control, obj), (buffer, control))
61            },
62        )
63    }
64}
65
66/// Helper trait for [`RecvFrom`], [`RecvFromVectored`] and [`RecvMsg`].
67pub trait RecvResultExt {
68    /// The mapped result.
69    type RecvResult;
70
71    /// Create [`SockAddr`] if the result is [`Ok`].
72    fn map_addr(self) -> Self::RecvResult;
73}
74
75impl<T> RecvResultExt for BufResult<usize, (T, SockAddrStorage, socklen_t)> {
76    type RecvResult = BufResult<(usize, SockAddr), T>;
77
78    fn map_addr(self) -> Self::RecvResult {
79        self.map_buffer(|(buffer, addr_buffer, addr_size)| (buffer, addr_buffer, addr_size, 0))
80            .map_addr()
81            .map_res(|(res, _, addr)| (res, addr))
82    }
83}
84
85impl<T> RecvResultExt for BufResult<usize, (T, SockAddrStorage, socklen_t, usize)> {
86    type RecvResult = BufResult<(usize, usize, SockAddr), T>;
87
88    fn map_addr(self) -> Self::RecvResult {
89        self.map2(
90            |res, (buffer, addr_buffer, addr_size, len)| {
91                let addr = unsafe { SockAddr::new(addr_buffer, addr_size) };
92                ((res, len, addr), buffer)
93            },
94            |(buffer, ..)| buffer,
95        )
96    }
97}
98
99/// Helper trait for [`ReadManagedAt`] and [`RecvManaged`].
100pub trait ResultTakeBuffer {
101    /// The buffer pool of the op.
102    type BufferPool;
103    /// The buffer type of the op.
104    type Buffer<'a>;
105
106    /// Take the buffer from result.
107    fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>>;
108}
109
110impl<T: TakeBuffer> ResultTakeBuffer for (BufResult<usize, T>, u32) {
111    type Buffer<'a> = T::Buffer<'a>;
112    type BufferPool = T::BufferPool;
113
114    fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>> {
115        let (BufResult(result, op), flags) = self;
116        op.take_buffer(pool, result, flags)
117    }
118}
119
120/// Spawn a blocking function in the thread pool.
121pub struct Asyncify<F, D> {
122    pub(crate) f: Option<F>,
123    pub(crate) data: Option<D>,
124    _p: PhantomPinned,
125}
126
127impl<F, D> Asyncify<F, D> {
128    /// Create [`Asyncify`].
129    pub fn new(f: F) -> Self {
130        Self {
131            f: Some(f),
132            data: None,
133            _p: PhantomPinned,
134        }
135    }
136}
137
138impl<F, D> IntoInner for Asyncify<F, D> {
139    type Inner = D;
140
141    fn into_inner(mut self) -> Self::Inner {
142        self.data.take().expect("the data should not be None")
143    }
144}
145
146/// Spawn a blocking function with a file descriptor in the thread pool.
147pub struct AsyncifyFd<S, F, D> {
148    pub(crate) fd: SharedFd<S>,
149    pub(crate) f: Option<F>,
150    pub(crate) data: Option<D>,
151    _p: PhantomPinned,
152}
153
154impl<S, F, D> AsyncifyFd<S, F, D> {
155    /// Create [`AsyncifyFd`].
156    pub fn new(fd: SharedFd<S>, f: F) -> Self {
157        Self {
158            fd,
159            f: Some(f),
160            data: None,
161            _p: PhantomPinned,
162        }
163    }
164}
165
166impl<S, F, D> IntoInner for AsyncifyFd<S, F, D> {
167    type Inner = D;
168
169    fn into_inner(mut self) -> Self::Inner {
170        self.data.take().expect("the data should not be None")
171    }
172}
173
174/// Close the file fd.
175pub struct CloseFile {
176    pub(crate) fd: ManuallyDrop<OwnedFd>,
177}
178
179impl CloseFile {
180    /// Create [`CloseFile`].
181    pub fn new(fd: OwnedFd) -> Self {
182        Self {
183            fd: ManuallyDrop::new(fd),
184        }
185    }
186}
187
188/// Read a file at specified position into specified buffer.
189#[derive(Debug)]
190pub struct ReadAt<T: IoBufMut, S> {
191    pub(crate) fd: S,
192    pub(crate) offset: u64,
193    pub(crate) buffer: T,
194    #[cfg(aio)]
195    pub(crate) aiocb: libc::aiocb,
196    _p: PhantomPinned,
197}
198
199impl<T: IoBufMut, S> ReadAt<T, S> {
200    /// Create [`ReadAt`].
201    pub fn new(fd: S, offset: u64, buffer: T) -> Self {
202        Self {
203            fd,
204            offset,
205            buffer,
206            #[cfg(aio)]
207            aiocb: unsafe { std::mem::zeroed() },
208            _p: PhantomPinned,
209        }
210    }
211}
212
213impl<T: IoBufMut, S> IntoInner for ReadAt<T, S> {
214    type Inner = T;
215
216    fn into_inner(self) -> Self::Inner {
217        self.buffer
218    }
219}
220
221/// Write a file at specified position from specified buffer.
222#[derive(Debug)]
223pub struct WriteAt<T: IoBuf, S> {
224    pub(crate) fd: S,
225    pub(crate) offset: u64,
226    pub(crate) buffer: T,
227    #[cfg(aio)]
228    pub(crate) aiocb: libc::aiocb,
229    _p: PhantomPinned,
230}
231
232impl<T: IoBuf, S> WriteAt<T, S> {
233    /// Create [`WriteAt`].
234    pub fn new(fd: S, offset: u64, buffer: T) -> Self {
235        Self {
236            fd,
237            offset,
238            buffer,
239            #[cfg(aio)]
240            aiocb: unsafe { std::mem::zeroed() },
241            _p: PhantomPinned,
242        }
243    }
244}
245
246impl<T: IoBuf, S> IntoInner for WriteAt<T, S> {
247    type Inner = T;
248
249    fn into_inner(self) -> Self::Inner {
250        self.buffer
251    }
252}
253
254/// Sync data to the disk.
255pub struct Sync<S> {
256    pub(crate) fd: S,
257    #[allow(dead_code)]
258    pub(crate) datasync: bool,
259    #[cfg(aio)]
260    pub(crate) aiocb: libc::aiocb,
261}
262
263impl<S> Sync<S> {
264    /// Create [`Sync`].
265    ///
266    /// If `datasync` is `true`, the file metadata may not be synchronized.
267    pub fn new(fd: S, datasync: bool) -> Self {
268        Self {
269            fd,
270            datasync,
271            #[cfg(aio)]
272            aiocb: unsafe { std::mem::zeroed() },
273        }
274    }
275}
276
277/// Shutdown a socket.
278pub struct ShutdownSocket<S> {
279    pub(crate) fd: S,
280    pub(crate) how: Shutdown,
281}
282
283impl<S> ShutdownSocket<S> {
284    /// Create [`ShutdownSocket`].
285    pub fn new(fd: S, how: Shutdown) -> Self {
286        Self { fd, how }
287    }
288}
289
290/// Close socket fd.
291pub struct CloseSocket {
292    pub(crate) fd: ManuallyDrop<OwnedFd>,
293}
294
295impl CloseSocket {
296    /// Create [`CloseSocket`].
297    pub fn new(fd: OwnedFd) -> Self {
298        Self {
299            fd: ManuallyDrop::new(fd),
300        }
301    }
302}
303
304/// Connect to a remote address.
305pub struct Connect<S> {
306    pub(crate) fd: S,
307    pub(crate) addr: SockAddr,
308}
309
310impl<S> Connect<S> {
311    /// Create [`Connect`]. `fd` should be bound.
312    pub fn new(fd: S, addr: SockAddr) -> Self {
313        Self { fd, addr }
314    }
315}
316
317#[cfg(any(not(io_uring), fusion))]
318pub(crate) mod managed {
319    use std::io;
320
321    use compio_buf::IntoInner;
322
323    use super::{ReadAt, Recv};
324    use crate::{BorrowedBuffer, BufferPool, OwnedBuffer, TakeBuffer};
325
326    /// Read a file at specified position into managed buffer.
327    pub struct ReadManagedAt<S> {
328        pub(crate) op: ReadAt<OwnedBuffer, S>,
329    }
330
331    impl<S> ReadManagedAt<S> {
332        /// Create [`ReadManagedAt`].
333        pub fn new(fd: S, offset: u64, pool: &BufferPool, len: usize) -> io::Result<Self> {
334            #[cfg(fusion)]
335            let pool = pool.as_poll();
336            Ok(Self {
337                op: ReadAt::new(fd, offset, pool.get_buffer(len)?),
338            })
339        }
340    }
341
342    impl<S> TakeBuffer for ReadManagedAt<S> {
343        type Buffer<'a> = BorrowedBuffer<'a>;
344        type BufferPool = BufferPool;
345
346        fn take_buffer(
347            self,
348            buffer_pool: &BufferPool,
349            result: io::Result<usize>,
350            _: u32,
351        ) -> io::Result<BorrowedBuffer<'_>> {
352            let result = result?;
353            #[cfg(fusion)]
354            let buffer_pool = buffer_pool.as_poll();
355            let slice = self.op.into_inner();
356            // SAFETY: result is valid
357            let res = unsafe { buffer_pool.create_proxy(slice, result) };
358            #[cfg(fusion)]
359            let res = BorrowedBuffer::new_poll(res);
360            Ok(res)
361        }
362    }
363
364    /// Receive data from remote into managed buffer.
365    pub struct RecvManaged<S> {
366        pub(crate) op: Recv<OwnedBuffer, S>,
367    }
368
369    impl<S> RecvManaged<S> {
370        /// Create [`RecvManaged`].
371        pub fn new(fd: S, pool: &BufferPool, len: usize) -> io::Result<Self> {
372            #[cfg(fusion)]
373            let pool = pool.as_poll();
374            Ok(Self {
375                op: Recv::new(fd, pool.get_buffer(len)?),
376            })
377        }
378    }
379
380    impl<S> TakeBuffer for RecvManaged<S> {
381        type Buffer<'a> = BorrowedBuffer<'a>;
382        type BufferPool = BufferPool;
383
384        fn take_buffer(
385            self,
386            buffer_pool: &Self::BufferPool,
387            result: io::Result<usize>,
388            _: u32,
389        ) -> io::Result<Self::Buffer<'_>> {
390            let result = result?;
391            #[cfg(fusion)]
392            let buffer_pool = buffer_pool.as_poll();
393            let slice = self.op.into_inner();
394            // SAFETY: result is valid
395            let res = unsafe { buffer_pool.create_proxy(slice, result) };
396            #[cfg(fusion)]
397            let res = BorrowedBuffer::new_poll(res);
398            Ok(res)
399        }
400    }
401}
402
403#[cfg(not(io_uring))]
404pub use managed::*;