compio_driver/
lib.rs

1//! The platform-specified driver.
2//! Some types differ by compilation target.
3
4#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
5#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
6#![warn(missing_docs)]
7
8#[cfg(all(
9    target_os = "linux",
10    not(feature = "io-uring"),
11    not(feature = "polling")
12))]
13compile_error!("You must choose at least one of these features: [\"io-uring\", \"polling\"]");
14
15use std::{
16    io,
17    task::{Poll, Waker},
18    time::Duration,
19};
20
21use compio_buf::BufResult;
22use compio_log::instrument;
23
24mod key;
25pub use key::Key;
26
27pub mod op;
28#[cfg(unix)]
29#[cfg_attr(docsrs, doc(cfg(all())))]
30mod unix;
31#[cfg(unix)]
32use unix::Overlapped;
33
34mod asyncify;
35pub use asyncify::*;
36
37mod fd;
38pub use fd::*;
39
40mod driver_type;
41pub use driver_type::*;
42
43mod buffer_pool;
44pub use buffer_pool::*;
45
46cfg_if::cfg_if! {
47    if #[cfg(windows)] {
48        #[path = "iocp/mod.rs"]
49        mod sys;
50    } else if #[cfg(fusion)] {
51        #[path = "fusion/mod.rs"]
52        mod sys;
53    } else if #[cfg(io_uring)] {
54        #[path = "iour/mod.rs"]
55        mod sys;
56    } else if #[cfg(unix)] {
57        #[path = "poll/mod.rs"]
58        mod sys;
59    }
60}
61
62pub use sys::*;
63
64#[cfg(windows)]
65#[macro_export]
66#[doc(hidden)]
67macro_rules! syscall {
68    (BOOL, $e:expr) => {
69        $crate::syscall!($e, == 0)
70    };
71    (SOCKET, $e:expr) => {
72        $crate::syscall!($e, != 0)
73    };
74    (HANDLE, $e:expr) => {
75        $crate::syscall!($e, == ::windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE)
76    };
77    ($e:expr, $op: tt $rhs: expr) => {{
78        #[allow(unused_unsafe)]
79        let res = unsafe { $e };
80        if res $op $rhs {
81            Err(::std::io::Error::last_os_error())
82        } else {
83            Ok(res)
84        }
85    }};
86}
87
88/// Helper macro to execute a system call
89#[cfg(unix)]
90#[macro_export]
91#[doc(hidden)]
92macro_rules! syscall {
93    (break $e:expr) => {
94        loop {
95            match $crate::syscall!($e) {
96                Ok(fd) => break ::std::task::Poll::Ready(Ok(fd as usize)),
97                Err(e) if e.kind() == ::std::io::ErrorKind::WouldBlock || e.raw_os_error() == Some(::libc::EINPROGRESS)
98                    => break ::std::task::Poll::Pending,
99                Err(e) if e.kind() == ::std::io::ErrorKind::Interrupted => {},
100                Err(e) => break ::std::task::Poll::Ready(Err(e)),
101            }
102        }
103    };
104    ($e:expr, $f:ident($fd:expr)) => {
105        match $crate::syscall!(break $e) {
106            ::std::task::Poll::Pending => Ok($crate::sys::Decision::$f($fd)),
107            ::std::task::Poll::Ready(Ok(res)) => Ok($crate::sys::Decision::Completed(res)),
108            ::std::task::Poll::Ready(Err(e)) => Err(e),
109        }
110    };
111    ($e:expr) => {{
112        #[allow(unused_unsafe)]
113        let res = unsafe { $e };
114        if res == -1 {
115            Err(::std::io::Error::last_os_error())
116        } else {
117            Ok(res)
118        }
119    }};
120}
121
122#[macro_export]
123#[doc(hidden)]
124macro_rules! impl_raw_fd {
125    ($t:ty, $it:ty, $inner:ident) => {
126        impl $crate::AsRawFd for $t {
127            fn as_raw_fd(&self) -> $crate::RawFd {
128                self.$inner.as_raw_fd()
129            }
130        }
131        #[cfg(unix)]
132        impl std::os::fd::AsFd for $t {
133            fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> {
134                self.$inner.as_fd()
135            }
136        }
137        #[cfg(unix)]
138        impl std::os::fd::FromRawFd for $t {
139            unsafe fn from_raw_fd(fd: $crate::RawFd) -> Self {
140                Self {
141                    $inner: std::os::fd::FromRawFd::from_raw_fd(fd),
142                }
143            }
144        }
145        impl $crate::ToSharedFd<$it> for $t {
146            fn to_shared_fd(&self) -> $crate::SharedFd<$it> {
147                self.$inner.to_shared_fd()
148            }
149        }
150    };
151    ($t:ty, $it:ty, $inner:ident,file) => {
152        $crate::impl_raw_fd!($t, $it, $inner);
153        #[cfg(windows)]
154        impl std::os::windows::io::FromRawHandle for $t {
155            unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self {
156                Self {
157                    $inner: std::os::windows::io::FromRawHandle::from_raw_handle(handle),
158                }
159            }
160        }
161        #[cfg(windows)]
162        impl std::os::windows::io::AsHandle for $t {
163            fn as_handle(&self) -> std::os::windows::io::BorrowedHandle {
164                self.$inner.as_handle()
165            }
166        }
167        #[cfg(windows)]
168        impl std::os::windows::io::AsRawHandle for $t {
169            fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
170                self.$inner.as_raw_handle()
171            }
172        }
173    };
174    ($t:ty, $it:ty, $inner:ident,socket) => {
175        $crate::impl_raw_fd!($t, $it, $inner);
176        #[cfg(windows)]
177        impl std::os::windows::io::FromRawSocket for $t {
178            unsafe fn from_raw_socket(sock: std::os::windows::io::RawSocket) -> Self {
179                Self {
180                    $inner: std::os::windows::io::FromRawSocket::from_raw_socket(sock),
181                }
182            }
183        }
184        #[cfg(windows)]
185        impl std::os::windows::io::AsSocket for $t {
186            fn as_socket(&self) -> std::os::windows::io::BorrowedSocket {
187                self.$inner.as_socket()
188            }
189        }
190        #[cfg(windows)]
191        impl std::os::windows::io::AsRawSocket for $t {
192            fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
193                self.$inner.as_raw_socket()
194            }
195        }
196    };
197}
198
199/// The return type of [`Proactor::push`].
200pub enum PushEntry<K, R> {
201    /// The operation is pushed to the submission queue.
202    Pending(K),
203    /// The operation is ready and returns.
204    Ready(R),
205}
206
207impl<K, R> PushEntry<K, R> {
208    /// Get if the current variant is [`PushEntry::Ready`].
209    pub const fn is_ready(&self) -> bool {
210        matches!(self, Self::Ready(_))
211    }
212
213    /// Take the ready variant if exists.
214    pub fn take_ready(self) -> Option<R> {
215        match self {
216            Self::Pending(_) => None,
217            Self::Ready(res) => Some(res),
218        }
219    }
220
221    /// Map the [`PushEntry::Pending`] branch.
222    pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
223        match self {
224            Self::Pending(k) => PushEntry::Pending(f(k)),
225            Self::Ready(r) => PushEntry::Ready(r),
226        }
227    }
228
229    /// Map the [`PushEntry::Ready`] branch.
230    pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
231        match self {
232            Self::Pending(k) => PushEntry::Pending(k),
233            Self::Ready(r) => PushEntry::Ready(f(r)),
234        }
235    }
236}
237
238/// Low-level actions of completion-based IO.
239/// It owns the operations to keep the driver safe.
240pub struct Proactor {
241    driver: Driver,
242}
243
244impl Proactor {
245    /// Create [`Proactor`] with 1024 entries.
246    pub fn new() -> io::Result<Self> {
247        Self::builder().build()
248    }
249
250    /// Create [`ProactorBuilder`] to config the proactor.
251    pub fn builder() -> ProactorBuilder {
252        ProactorBuilder::new()
253    }
254
255    fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
256        Ok(Self {
257            driver: Driver::new(builder)?,
258        })
259    }
260
261    /// Attach an fd to the driver.
262    ///
263    /// ## Platform specific
264    /// * IOCP: it will be attached to the completion port. An fd could only be
265    ///   attached to one driver, and could only be attached once, even if you
266    ///   `try_clone` it.
267    /// * io-uring & polling: it will do nothing but return `Ok(())`.
268    pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
269        self.driver.attach(fd)
270    }
271
272    /// Cancel an operation with the pushed user-defined data.
273    ///
274    /// The cancellation is not reliable. The underlying operation may continue,
275    /// but just don't return from [`Proactor::poll`]. Therefore, although an
276    /// operation is cancelled, you should not reuse its `user_data`.
277    pub fn cancel<T: OpCode>(&mut self, mut op: Key<T>) -> Option<BufResult<usize, T>> {
278        instrument!(compio_log::Level::DEBUG, "cancel", ?op);
279        if op.set_cancelled() {
280            // SAFETY: completed.
281            Some(unsafe { op.into_inner() })
282        } else {
283            self.driver
284                .cancel(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) });
285            None
286        }
287    }
288
289    /// Push an operation into the driver, and return the unique key, called
290    /// user-defined data, associated with it.
291    pub fn push<T: OpCode + 'static>(&mut self, op: T) -> PushEntry<Key<T>, BufResult<usize, T>> {
292        let mut op = self.driver.create_op(op);
293        match self
294            .driver
295            .push(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) })
296        {
297            Poll::Pending => PushEntry::Pending(op),
298            Poll::Ready(res) => {
299                op.set_result(res);
300                // SAFETY: just completed.
301                PushEntry::Ready(unsafe { op.into_inner() })
302            }
303        }
304    }
305
306    /// Poll the driver and get completed entries.
307    /// You need to call [`Proactor::pop`] to get the pushed
308    /// operations.
309    pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
310        unsafe { self.driver.poll(timeout) }
311    }
312
313    /// Get the pushed operations from the completion entries.
314    ///
315    /// # Panics
316    /// This function will panic if the requested operation has not been
317    /// completed.
318    pub fn pop<T>(&mut self, op: Key<T>) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
319        instrument!(compio_log::Level::DEBUG, "pop", ?op);
320        if op.has_result() {
321            let flags = op.flags();
322            // SAFETY: completed.
323            PushEntry::Ready((unsafe { op.into_inner() }, flags))
324        } else {
325            PushEntry::Pending(op)
326        }
327    }
328
329    /// Update the waker of the specified op.
330    pub fn update_waker<T>(&mut self, op: &mut Key<T>, waker: Waker) {
331        op.set_waker(waker);
332    }
333
334    /// Create a notify handle to interrupt the inner driver.
335    pub fn handle(&self) -> NotifyHandle {
336        self.driver.handle()
337    }
338
339    /// Create buffer pool with given `buffer_size` and `buffer_len`
340    ///
341    /// # Notes
342    ///
343    /// If `buffer_len` is not a power of 2, it will be rounded up with
344    /// [`u16::next_power_of_two`].
345    pub fn create_buffer_pool(
346        &mut self,
347        buffer_len: u16,
348        buffer_size: usize,
349    ) -> io::Result<BufferPool> {
350        self.driver.create_buffer_pool(buffer_len, buffer_size)
351    }
352
353    /// Release the buffer pool
354    ///
355    /// # Safety
356    ///
357    /// Caller must make sure to release the buffer pool with the correct
358    /// driver, i.e., the one they created the buffer pool with.
359    pub unsafe fn release_buffer_pool(&mut self, buffer_pool: BufferPool) -> io::Result<()> {
360        self.driver.release_buffer_pool(buffer_pool)
361    }
362}
363
364impl AsRawFd for Proactor {
365    fn as_raw_fd(&self) -> RawFd {
366        self.driver.as_raw_fd()
367    }
368}
369
370/// An completed entry returned from kernel.
371#[derive(Debug)]
372pub(crate) struct Entry {
373    user_data: usize,
374    result: io::Result<usize>,
375    flags: u32,
376}
377
378impl Entry {
379    pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
380        Self {
381            user_data,
382            result,
383            flags: 0,
384        }
385    }
386
387    #[cfg(io_uring)]
388    // this method only used by in io-uring driver
389    pub(crate) fn set_flags(&mut self, flags: u32) {
390        self.flags = flags;
391    }
392
393    /// The user-defined data returned by [`Proactor::push`].
394    pub fn user_data(&self) -> usize {
395        self.user_data
396    }
397
398    pub fn flags(&self) -> u32 {
399        self.flags
400    }
401
402    /// The result of the operation.
403    pub fn into_result(self) -> io::Result<usize> {
404        self.result
405    }
406
407    /// SAFETY: `user_data` should be a valid pointer.
408    pub unsafe fn notify(self) {
409        let user_data = self.user_data();
410        let mut op = Key::<()>::new_unchecked(user_data);
411        op.set_flags(self.flags());
412        if op.set_result(self.into_result()) {
413            // SAFETY: completed and cancelled.
414            let _ = op.into_box();
415        }
416    }
417}
418
419#[derive(Debug, Clone)]
420enum ThreadPoolBuilder {
421    Create { limit: usize, recv_limit: Duration },
422    Reuse(AsyncifyPool),
423}
424
425impl Default for ThreadPoolBuilder {
426    fn default() -> Self {
427        Self::new()
428    }
429}
430
431impl ThreadPoolBuilder {
432    pub fn new() -> Self {
433        Self::Create {
434            limit: 256,
435            recv_limit: Duration::from_secs(60),
436        }
437    }
438
439    pub fn create_or_reuse(&self) -> AsyncifyPool {
440        match self {
441            Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
442            Self::Reuse(pool) => pool.clone(),
443        }
444    }
445}
446
447/// Builder for [`Proactor`].
448#[derive(Debug, Clone)]
449pub struct ProactorBuilder {
450    capacity: u32,
451    pool_builder: ThreadPoolBuilder,
452    sqpoll_idle: Option<Duration>,
453    coop_taskrun: bool,
454    taskrun_flag: bool,
455    eventfd: Option<RawFd>,
456}
457
458impl Default for ProactorBuilder {
459    fn default() -> Self {
460        Self::new()
461    }
462}
463
464impl ProactorBuilder {
465    /// Create the builder with default config.
466    pub fn new() -> Self {
467        Self {
468            capacity: 1024,
469            pool_builder: ThreadPoolBuilder::new(),
470            sqpoll_idle: None,
471            coop_taskrun: false,
472            taskrun_flag: false,
473            eventfd: None,
474        }
475    }
476
477    /// Set the capacity of the inner event queue or submission queue, if
478    /// exists. The default value is 1024.
479    pub fn capacity(&mut self, capacity: u32) -> &mut Self {
480        self.capacity = capacity;
481        self
482    }
483
484    /// Set the thread number limit of the inner thread pool, if exists. The
485    /// default value is 256.
486    ///
487    /// It will be ignored if `reuse_thread_pool` is set.
488    pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
489        if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
490            *limit = value;
491        }
492        self
493    }
494
495    /// Set the waiting timeout of the inner thread, if exists. The default is
496    /// 60 seconds.
497    ///
498    /// It will be ignored if `reuse_thread_pool` is set.
499    pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
500        if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
501            *recv_limit = timeout;
502        }
503        self
504    }
505
506    /// Set to reuse an existing [`AsyncifyPool`] in this proactor.
507    pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
508        self.pool_builder = ThreadPoolBuilder::Reuse(pool);
509        self
510    }
511
512    /// Force reuse the thread pool for each proactor created by this builder,
513    /// even `reuse_thread_pool` is not set.
514    pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
515        self.reuse_thread_pool(self.create_or_get_thread_pool());
516        self
517    }
518
519    /// Create or reuse the thread pool from the config.
520    pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
521        self.pool_builder.create_or_reuse()
522    }
523
524    /// Set `io-uring` sqpoll idle milliseconds, when `sqpoll_idle` is set,
525    /// io-uring sqpoll feature will be enabled
526    ///
527    /// # Notes
528    ///
529    /// - Only effective when the `io-uring` feature is enabled
530    /// - `idle` must >= 1ms, otherwise will set sqpoll idle 0ms
531    /// - `idle` will be rounded down
532    pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
533        self.sqpoll_idle = Some(idle);
534        self
535    }
536
537    /// `coop_taskrun` feature has been available since Linux Kernel 5.19. This
538    /// will optimize performance for most cases, especially compio is a single
539    /// thread runtime.
540    ///
541    /// However, it can't run with sqpoll feature.
542    ///
543    /// # Notes
544    ///
545    /// - Only effective when the `io-uring` feature is enabled
546    pub fn coop_taskrun(&mut self, enable: bool) -> &mut Self {
547        self.coop_taskrun = enable;
548        self
549    }
550
551    /// `taskrun_flag` feature has been available since Linux Kernel 5.19. This
552    /// allows io-uring driver can know if any cqes are available when try to
553    /// push sqe to sq. This should be enabled with
554    /// [`coop_taskrun`](Self::coop_taskrun)
555    ///
556    /// # Notes
557    ///
558    /// - Only effective when the `io-uring` feature is enabled
559    pub fn taskrun_flag(&mut self, enable: bool) -> &mut Self {
560        self.taskrun_flag = enable;
561        self
562    }
563
564    /// Register an eventfd to io-uring.
565    ///
566    /// # Notes
567    ///
568    /// - Only effective when the `io-uring` feature is enabled
569    pub fn register_eventfd(&mut self, fd: RawFd) -> &mut Self {
570        self.eventfd = Some(fd);
571        self
572    }
573
574    /// Build the [`Proactor`].
575    pub fn build(&self) -> io::Result<Proactor> {
576        Proactor::with_builder(self)
577    }
578}