Skip to main content

compio_driver/
op.rs

1//! The async operations.
2//!
3//! Types in this mod represents the low-level operations passed to kernel.
4//! The operation itself doesn't perform anything.
5//! You need to pass them to [`crate::Proactor`], and poll the driver.
6
7use std::{io, marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, SetLen};
10use pin_project_lite::pin_project;
11use socket2::{SockAddr, SockAddrStorage, socklen_t};
12
13#[cfg(linux_all)]
14pub use crate::sys::op::Splice;
15pub use crate::sys::op::{
16    Accept, Recv, RecvFrom, RecvFromVectored, RecvMsg, RecvVectored, Send, SendMsg, SendTo,
17    SendToVectored, SendVectored,
18};
19#[cfg(windows)]
20pub use crate::sys::op::{ConnectNamedPipe, DeviceIoControl};
21#[cfg(unix)]
22pub use crate::sys::op::{
23    CreateDir, CreateSocket, FileStat, HardLink, Interest, OpenFile, PathStat, PollOnce,
24    ReadVectored, ReadVectoredAt, Rename, Stat, Symlink, TruncateFile, Unlink, WriteVectored,
25    WriteVectoredAt,
26};
27#[cfg(io_uring)]
28pub use crate::sys::op::{ReadManaged, ReadManagedAt, RecvManaged};
29use crate::{Extra, OwnedFd, SharedFd, TakeBuffer, sys::aio::*};
30
31/// Trait to update the buffer length inside the [`BufResult`].
32pub trait BufResultExt {
33    /// Call [`SetLen::advance_to`] if the result is [`Ok`].
34    ///
35    /// # Safety
36    ///
37    /// The result value must be a valid length to advance to.
38    unsafe fn map_advanced(self) -> Self;
39}
40
41/// Trait to update the buffer length inside the [`BufResult`].
42pub trait VecBufResultExt {
43    /// Call [`SetLen::advance_vec_to`] if the result is [`Ok`].
44    ///
45    /// # Safety
46    ///
47    /// The result value must be a valid length to advance to.
48    unsafe fn map_vec_advanced(self) -> Self;
49}
50
51impl<T: SetLen + IoBuf> BufResultExt for BufResult<usize, T> {
52    unsafe fn map_advanced(self) -> Self {
53        unsafe {
54            self.map_res(|res| (res, ()))
55                .map_advanced()
56                .map_res(|(res, _)| res)
57        }
58    }
59}
60
61impl<T: SetLen + IoVectoredBuf> VecBufResultExt for BufResult<usize, T> {
62    unsafe fn map_vec_advanced(self) -> Self {
63        unsafe {
64            self.map_res(|res| (res, ()))
65                .map_vec_advanced()
66                .map_res(|(res, _)| res)
67        }
68    }
69}
70
71impl<T: SetLen + IoBuf, O> BufResultExt for BufResult<(usize, O), T> {
72    unsafe fn map_advanced(self) -> Self {
73        self.map(|(init, obj), mut buffer| {
74            unsafe {
75                buffer.advance_to(init);
76            }
77            ((init, obj), buffer)
78        })
79    }
80}
81
82impl<T: SetLen + IoVectoredBuf, O> VecBufResultExt for BufResult<(usize, O), T> {
83    unsafe fn map_vec_advanced(self) -> Self {
84        self.map(|(init, obj), mut buffer| {
85            unsafe {
86                buffer.advance_vec_to(init);
87            }
88            ((init, obj), buffer)
89        })
90    }
91}
92
93impl<T: SetLen + IoBuf, C: SetLen + IoBuf, O> BufResultExt
94    for BufResult<(usize, usize, O), (T, C)>
95{
96    unsafe fn map_advanced(self) -> Self {
97        self.map(
98            |(init_buffer, init_control, obj), (mut buffer, mut control)| {
99                unsafe {
100                    buffer.advance_to(init_buffer);
101                    control.advance_to(init_control);
102                }
103                ((init_buffer, init_control, obj), (buffer, control))
104            },
105        )
106    }
107}
108
109impl<T: SetLen + IoVectoredBuf, C: SetLen + IoBuf, O> VecBufResultExt
110    for BufResult<(usize, usize, O), (T, C)>
111{
112    unsafe fn map_vec_advanced(self) -> Self {
113        self.map(
114            |(init_buffer, init_control, obj), (mut buffer, mut control)| {
115                unsafe {
116                    buffer.advance_vec_to(init_buffer);
117                    control.advance_to(init_control);
118                }
119                ((init_buffer, init_control, obj), (buffer, control))
120            },
121        )
122    }
123}
124
125/// Helper trait for [`RecvFrom`], [`RecvFromVectored`] and [`RecvMsg`].
126pub trait RecvResultExt {
127    /// The mapped result.
128    type RecvResult;
129
130    /// Create [`SockAddr`] if the result is [`Ok`].
131    fn map_addr(self) -> Self::RecvResult;
132}
133
134impl<T> RecvResultExt for BufResult<usize, (T, SockAddrStorage, socklen_t)> {
135    type RecvResult = BufResult<(usize, SockAddr), T>;
136
137    fn map_addr(self) -> Self::RecvResult {
138        self.map_buffer(|(buffer, addr_buffer, addr_size)| (buffer, addr_buffer, addr_size, 0))
139            .map_addr()
140            .map_res(|(res, _, addr)| (res, addr))
141    }
142}
143
144impl<T> RecvResultExt for BufResult<usize, (T, SockAddrStorage, socklen_t, usize)> {
145    type RecvResult = BufResult<(usize, usize, SockAddr), T>;
146
147    fn map_addr(self) -> Self::RecvResult {
148        self.map2(
149            |res, (buffer, addr_buffer, addr_size, len)| {
150                let addr = unsafe { SockAddr::new(addr_buffer, addr_size) };
151                ((res, len, addr), buffer)
152            },
153            |(buffer, ..)| buffer,
154        )
155    }
156}
157
158/// Helper trait for [`ReadManagedAt`] and [`RecvManaged`].
159pub trait ResultTakeBuffer {
160    /// The buffer pool of the op.
161    type BufferPool;
162    /// The buffer type of the op.
163    type Buffer<'a>;
164
165    /// Take the buffer from result.
166    fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>>;
167}
168
169impl<T: TakeBuffer> ResultTakeBuffer for (BufResult<usize, T>, Extra) {
170    type Buffer<'a> = T::Buffer<'a>;
171    type BufferPool = T::BufferPool;
172
173    fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>> {
174        let (BufResult(result, op), extra) = self;
175        op.take_buffer(pool, result, extra.buffer_id()?)
176    }
177}
178
179pin_project! {
180    /// Spawn a blocking function in the thread pool.
181    pub struct Asyncify<F, D> {
182        pub(crate) f: Option<F>,
183        pub(crate) data: Option<D>,
184        _p: PhantomPinned,
185    }
186}
187
188impl<F, D> Asyncify<F, D> {
189    /// Create [`Asyncify`].
190    pub fn new(f: F) -> Self {
191        Self {
192            f: Some(f),
193            data: None,
194            _p: PhantomPinned,
195        }
196    }
197}
198
199impl<F, D> IntoInner for Asyncify<F, D> {
200    type Inner = D;
201
202    fn into_inner(mut self) -> Self::Inner {
203        self.data.take().expect("the data should not be None")
204    }
205}
206
207pin_project! {
208    /// Spawn a blocking function with a file descriptor in the thread pool.
209    pub struct AsyncifyFd<S, F, D> {
210        pub(crate) fd: SharedFd<S>,
211        pub(crate) f: Option<F>,
212        pub(crate) data: Option<D>,
213        _p: PhantomPinned,
214    }
215}
216
217impl<S, F, D> AsyncifyFd<S, F, D> {
218    /// Create [`AsyncifyFd`].
219    pub fn new(fd: SharedFd<S>, f: F) -> Self {
220        Self {
221            fd,
222            f: Some(f),
223            data: None,
224            _p: PhantomPinned,
225        }
226    }
227}
228
229impl<S, F, D> IntoInner for AsyncifyFd<S, F, D> {
230    type Inner = D;
231
232    fn into_inner(mut self) -> Self::Inner {
233        self.data.take().expect("the data should not be None")
234    }
235}
236
237/// Close the file fd.
238pub struct CloseFile {
239    pub(crate) fd: ManuallyDrop<OwnedFd>,
240}
241
242impl CloseFile {
243    /// Create [`CloseFile`].
244    pub fn new(fd: OwnedFd) -> Self {
245        Self {
246            fd: ManuallyDrop::new(fd),
247        }
248    }
249}
250
251pin_project! {
252    /// Read a file at specified position into specified buffer.
253    #[derive(Debug)]
254    pub struct ReadAt<T: IoBufMut, S> {
255        pub(crate) fd: S,
256        pub(crate) offset: u64,
257        #[pin]
258        pub(crate) buffer: T,
259        pub(crate) aiocb: aiocb,
260        _p: PhantomPinned,
261    }
262}
263
264impl<T: IoBufMut, S> ReadAt<T, S> {
265    /// Create [`ReadAt`].
266    pub fn new(fd: S, offset: u64, buffer: T) -> Self {
267        Self {
268            fd,
269            offset,
270            buffer,
271            aiocb: new_aiocb(),
272            _p: PhantomPinned,
273        }
274    }
275}
276
277impl<T: IoBufMut, S> IntoInner for ReadAt<T, S> {
278    type Inner = T;
279
280    fn into_inner(self) -> Self::Inner {
281        self.buffer
282    }
283}
284
285pin_project! {
286    /// Write a file at specified position from specified buffer.
287    #[derive(Debug)]
288    pub struct WriteAt<T: IoBuf, S> {
289        pub(crate) fd: S,
290        pub(crate) offset: u64,
291        #[pin]
292        pub(crate) buffer: T,
293        pub(crate) aiocb: aiocb,
294        _p: PhantomPinned,
295    }
296}
297
298impl<T: IoBuf, S> WriteAt<T, S> {
299    /// Create [`WriteAt`].
300    pub fn new(fd: S, offset: u64, buffer: T) -> Self {
301        Self {
302            fd,
303            offset,
304            buffer,
305            aiocb: new_aiocb(),
306            _p: PhantomPinned,
307        }
308    }
309}
310
311impl<T: IoBuf, S> IntoInner for WriteAt<T, S> {
312    type Inner = T;
313
314    fn into_inner(self) -> Self::Inner {
315        self.buffer
316    }
317}
318
319pin_project! {
320    /// Read a file.
321    pub struct Read<T: IoBufMut, S> {
322        pub(crate) fd: S,
323        #[pin]
324        pub(crate) buffer: T,
325        _p: PhantomPinned,
326    }
327}
328
329impl<T: IoBufMut, S> Read<T, S> {
330    /// Create [`Read`].
331    pub fn new(fd: S, buffer: T) -> Self {
332        Self {
333            fd,
334            buffer,
335            _p: PhantomPinned,
336        }
337    }
338}
339
340impl<T: IoBufMut, S> IntoInner for Read<T, S> {
341    type Inner = T;
342
343    fn into_inner(self) -> Self::Inner {
344        self.buffer
345    }
346}
347
348/// Write a file.
349pub struct Write<T: IoBuf, S> {
350    pub(crate) fd: S,
351    pub(crate) buffer: T,
352    _p: PhantomPinned,
353}
354
355impl<T: IoBuf, S> Write<T, S> {
356    /// Create [`Write`].
357    pub fn new(fd: S, buffer: T) -> Self {
358        Self {
359            fd,
360            buffer,
361            _p: PhantomPinned,
362        }
363    }
364}
365
366impl<T: IoBuf, S> IntoInner for Write<T, S> {
367    type Inner = T;
368
369    fn into_inner(self) -> Self::Inner {
370        self.buffer
371    }
372}
373
374pin_project! {
375    /// Sync data to the disk.
376    pub struct Sync<S> {
377        pub(crate) fd: S,
378        pub(crate) datasync: bool,
379        pub(crate) aiocb: aiocb,
380    }
381}
382
383impl<S> Sync<S> {
384    /// Create [`Sync`].
385    ///
386    /// If `datasync` is `true`, the file metadata may not be synchronized.
387    pub fn new(fd: S, datasync: bool) -> Self {
388        Self {
389            fd,
390            datasync,
391            aiocb: new_aiocb(),
392        }
393    }
394}
395
396/// Shutdown a socket.
397pub struct ShutdownSocket<S> {
398    pub(crate) fd: S,
399    pub(crate) how: Shutdown,
400}
401
402impl<S> ShutdownSocket<S> {
403    /// Create [`ShutdownSocket`].
404    pub fn new(fd: S, how: Shutdown) -> Self {
405        Self { fd, how }
406    }
407}
408
409/// Close socket fd.
410pub struct CloseSocket {
411    pub(crate) fd: ManuallyDrop<OwnedFd>,
412}
413
414impl CloseSocket {
415    /// Create [`CloseSocket`].
416    pub fn new(fd: OwnedFd) -> Self {
417        Self {
418            fd: ManuallyDrop::new(fd),
419        }
420    }
421}
422
423/// Connect to a remote address.
424pub struct Connect<S> {
425    pub(crate) fd: S,
426    pub(crate) addr: SockAddr,
427}
428
429impl<S> Connect<S> {
430    /// Create [`Connect`]. `fd` should be bound.
431    pub fn new(fd: S, addr: SockAddr) -> Self {
432        Self { fd, addr }
433    }
434}
435
436#[cfg(any(not(io_uring), fusion))]
437pub(crate) mod managed {
438    use std::io;
439
440    use compio_buf::IntoInner;
441    use pin_project_lite::pin_project;
442
443    use super::{Read, ReadAt, Recv};
444    use crate::{BorrowedBuffer, BufferPool, OwnedBuffer, TakeBuffer};
445
446    pin_project! {
447        /// Read a file at specified position into managed buffer.
448        pub struct ReadManagedAt<S> {
449            #[pin]
450            pub(crate) op: ReadAt<OwnedBuffer, S>,
451        }
452    }
453
454    impl<S> ReadManagedAt<S> {
455        /// Create [`ReadManagedAt`].
456        pub fn new(fd: S, offset: u64, pool: &BufferPool, len: usize) -> io::Result<Self> {
457            #[cfg(fusion)]
458            let pool = pool.as_poll();
459            Ok(Self {
460                op: ReadAt::new(fd, offset, pool.get_buffer(len)?),
461            })
462        }
463    }
464
465    impl<S> TakeBuffer for ReadManagedAt<S> {
466        type Buffer<'a> = BorrowedBuffer<'a>;
467        type BufferPool = BufferPool;
468
469        fn take_buffer(
470            self,
471            buffer_pool: &BufferPool,
472            result: io::Result<usize>,
473            _: u16,
474        ) -> io::Result<BorrowedBuffer<'_>> {
475            let result = result?;
476            #[cfg(fusion)]
477            let buffer_pool = buffer_pool.as_poll();
478            let slice = self.op.into_inner();
479            // SAFETY: result is valid
480            let res = unsafe { buffer_pool.create_proxy(slice, result) };
481            #[cfg(fusion)]
482            let res = BorrowedBuffer::new_poll(res);
483            Ok(res)
484        }
485    }
486
487    pin_project! {
488        /// Read a file into managed buffer.
489        pub struct ReadManaged<S> {
490            #[pin]
491            pub(crate) op: Read<OwnedBuffer, S>,
492        }
493    }
494
495    impl<S> ReadManaged<S> {
496        /// Create [`ReadManaged`].
497        pub fn new(fd: S, pool: &BufferPool, len: usize) -> io::Result<Self> {
498            #[cfg(fusion)]
499            let pool = pool.as_poll();
500            Ok(Self {
501                op: Read::new(fd, pool.get_buffer(len)?),
502            })
503        }
504    }
505
506    impl<S> TakeBuffer for ReadManaged<S> {
507        type Buffer<'a> = BorrowedBuffer<'a>;
508        type BufferPool = BufferPool;
509
510        fn take_buffer(
511            self,
512            buffer_pool: &Self::BufferPool,
513            result: io::Result<usize>,
514            _: u16,
515        ) -> io::Result<Self::Buffer<'_>> {
516            let result = result?;
517            #[cfg(fusion)]
518            let buffer_pool = buffer_pool.as_poll();
519            let slice = self.op.into_inner();
520            // SAFETY: result is valid
521            let res = unsafe { buffer_pool.create_proxy(slice, result) };
522            #[cfg(fusion)]
523            let res = BorrowedBuffer::new_poll(res);
524            Ok(res)
525        }
526    }
527
528    pin_project! {
529        /// Receive data from remote into managed buffer.
530        ///
531        /// It is only used for socket operations. If you want to read from a pipe,
532        /// use [`ReadManaged`].
533        pub struct RecvManaged<S> {
534            #[pin]
535            pub(crate) op: Recv<OwnedBuffer, S>,
536        }
537    }
538
539    impl<S> RecvManaged<S> {
540        /// Create [`RecvManaged`].
541        pub fn new(fd: S, pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
542            #[cfg(fusion)]
543            let pool = pool.as_poll();
544            Ok(Self {
545                op: Recv::new(fd, pool.get_buffer(len)?, flags),
546            })
547        }
548    }
549
550    impl<S> TakeBuffer for RecvManaged<S> {
551        type Buffer<'a> = BorrowedBuffer<'a>;
552        type BufferPool = BufferPool;
553
554        fn take_buffer(
555            self,
556            buffer_pool: &Self::BufferPool,
557            result: io::Result<usize>,
558            _: u16,
559        ) -> io::Result<Self::Buffer<'_>> {
560            let result = result?;
561            #[cfg(fusion)]
562            let buffer_pool = buffer_pool.as_poll();
563            let slice = self.op.into_inner();
564            // SAFETY: result is valid
565            let res = unsafe { buffer_pool.create_proxy(slice, result) };
566            #[cfg(fusion)]
567            let res = BorrowedBuffer::new_poll(res);
568            Ok(res)
569        }
570    }
571}
572
573#[cfg(not(io_uring))]
574pub use managed::*;