compio_driver/
lib.rs

1//! The platform-specified driver.
2//! Some types differ by compilation target.
3
4#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
5#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
6#![warn(missing_docs)]
7
8#[cfg(all(
9    target_os = "linux",
10    not(feature = "io-uring"),
11    not(feature = "polling")
12))]
13compile_error!("You must choose at least one of these features: [\"io-uring\", \"polling\"]");
14
15use std::{
16    io,
17    task::{Poll, Waker},
18    time::Duration,
19};
20
21use compio_buf::BufResult;
22use compio_log::instrument;
23
24mod key;
25pub use key::Key;
26
27pub mod op;
28#[cfg(unix)]
29#[cfg_attr(docsrs, doc(cfg(all())))]
30mod unix;
31#[cfg(unix)]
32use unix::Overlapped;
33
34mod asyncify;
35pub use asyncify::*;
36
37mod fd;
38pub use fd::*;
39
40mod driver_type;
41pub use driver_type::*;
42
43mod buffer_pool;
44pub use buffer_pool::*;
45
46cfg_if::cfg_if! {
47    if #[cfg(windows)] {
48        #[path = "iocp/mod.rs"]
49        mod sys;
50    } else if #[cfg(fusion)] {
51        #[path = "fusion/mod.rs"]
52        mod sys;
53    } else if #[cfg(io_uring)] {
54        #[path = "iour/mod.rs"]
55        mod sys;
56    } else if #[cfg(unix)] {
57        #[path = "poll/mod.rs"]
58        mod sys;
59    }
60}
61
62pub use sys::*;
63
64#[cfg(windows)]
65#[macro_export]
66#[doc(hidden)]
67macro_rules! syscall {
68    (BOOL, $e:expr) => {
69        $crate::syscall!($e, == 0)
70    };
71    (SOCKET, $e:expr) => {
72        $crate::syscall!($e, != 0)
73    };
74    (HANDLE, $e:expr) => {
75        $crate::syscall!($e, == ::windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE)
76    };
77    ($e:expr, $op: tt $rhs: expr) => {{
78        #[allow(unused_unsafe)]
79        let res = unsafe { $e };
80        if res $op $rhs {
81            Err(::std::io::Error::last_os_error())
82        } else {
83            Ok(res)
84        }
85    }};
86}
87
88/// Helper macro to execute a system call
89#[cfg(unix)]
90#[macro_export]
91#[doc(hidden)]
92macro_rules! syscall {
93    (break $e:expr) => {
94        loop {
95            match $crate::syscall!($e) {
96                Ok(fd) => break ::std::task::Poll::Ready(Ok(fd as usize)),
97                Err(e) if e.kind() == ::std::io::ErrorKind::WouldBlock || e.raw_os_error() == Some(::libc::EINPROGRESS)
98                    => break ::std::task::Poll::Pending,
99                Err(e) if e.kind() == ::std::io::ErrorKind::Interrupted => {},
100                Err(e) => break ::std::task::Poll::Ready(Err(e)),
101            }
102        }
103    };
104    ($e:expr, $f:ident($fd:expr)) => {
105        match $crate::syscall!(break $e) {
106            ::std::task::Poll::Pending => Ok($crate::sys::Decision::$f($fd)),
107            ::std::task::Poll::Ready(Ok(res)) => Ok($crate::sys::Decision::Completed(res)),
108            ::std::task::Poll::Ready(Err(e)) => Err(e),
109        }
110    };
111    ($e:expr) => {{
112        #[allow(unused_unsafe)]
113        let res = unsafe { $e };
114        if res == -1 {
115            Err(::std::io::Error::last_os_error())
116        } else {
117            Ok(res)
118        }
119    }};
120}
121
122#[macro_export]
123#[doc(hidden)]
124macro_rules! impl_raw_fd {
125    ($t:ty, $it:ty, $inner:ident) => {
126        impl $crate::AsRawFd for $t {
127            fn as_raw_fd(&self) -> $crate::RawFd {
128                self.$inner.as_raw_fd()
129            }
130        }
131        #[cfg(unix)]
132        impl std::os::fd::AsFd for $t {
133            fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> {
134                self.$inner.as_fd()
135            }
136        }
137        #[cfg(unix)]
138        impl std::os::fd::FromRawFd for $t {
139            unsafe fn from_raw_fd(fd: $crate::RawFd) -> Self {
140                Self {
141                    $inner: std::os::fd::FromRawFd::from_raw_fd(fd),
142                }
143            }
144        }
145        impl $crate::ToSharedFd<$it> for $t {
146            fn to_shared_fd(&self) -> $crate::SharedFd<$it> {
147                self.$inner.to_shared_fd()
148            }
149        }
150    };
151    ($t:ty, $it:ty, $inner:ident,file) => {
152        $crate::impl_raw_fd!($t, $it, $inner);
153        #[cfg(windows)]
154        impl std::os::windows::io::FromRawHandle for $t {
155            unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self {
156                Self {
157                    $inner: std::os::windows::io::FromRawHandle::from_raw_handle(handle),
158                }
159            }
160        }
161        #[cfg(windows)]
162        impl std::os::windows::io::AsHandle for $t {
163            fn as_handle(&self) -> std::os::windows::io::BorrowedHandle {
164                self.$inner.as_handle()
165            }
166        }
167        #[cfg(windows)]
168        impl std::os::windows::io::AsRawHandle for $t {
169            fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
170                self.$inner.as_raw_handle()
171            }
172        }
173    };
174    ($t:ty, $it:ty, $inner:ident,socket) => {
175        $crate::impl_raw_fd!($t, $it, $inner);
176        #[cfg(windows)]
177        impl std::os::windows::io::FromRawSocket for $t {
178            unsafe fn from_raw_socket(sock: std::os::windows::io::RawSocket) -> Self {
179                Self {
180                    $inner: std::os::windows::io::FromRawSocket::from_raw_socket(sock),
181                }
182            }
183        }
184        #[cfg(windows)]
185        impl std::os::windows::io::AsSocket for $t {
186            fn as_socket(&self) -> std::os::windows::io::BorrowedSocket {
187                self.$inner.as_socket()
188            }
189        }
190        #[cfg(windows)]
191        impl std::os::windows::io::AsRawSocket for $t {
192            fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
193                self.$inner.as_raw_socket()
194            }
195        }
196    };
197}
198
199/// The return type of [`Proactor::push`].
200pub enum PushEntry<K, R> {
201    /// The operation is pushed to the submission queue.
202    Pending(K),
203    /// The operation is ready and returns.
204    Ready(R),
205}
206
207impl<K, R> PushEntry<K, R> {
208    /// Get if the current variant is [`PushEntry::Ready`].
209    pub const fn is_ready(&self) -> bool {
210        matches!(self, Self::Ready(_))
211    }
212
213    /// Take the ready variant if exists.
214    pub fn take_ready(self) -> Option<R> {
215        match self {
216            Self::Pending(_) => None,
217            Self::Ready(res) => Some(res),
218        }
219    }
220
221    /// Map the [`PushEntry::Pending`] branch.
222    pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
223        match self {
224            Self::Pending(k) => PushEntry::Pending(f(k)),
225            Self::Ready(r) => PushEntry::Ready(r),
226        }
227    }
228
229    /// Map the [`PushEntry::Ready`] branch.
230    pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
231        match self {
232            Self::Pending(k) => PushEntry::Pending(k),
233            Self::Ready(r) => PushEntry::Ready(f(r)),
234        }
235    }
236}
237
238/// Low-level actions of completion-based IO.
239/// It owns the operations to keep the driver safe.
240pub struct Proactor {
241    driver: Driver,
242}
243
244impl Proactor {
245    /// Create [`Proactor`] with 1024 entries.
246    pub fn new() -> io::Result<Self> {
247        Self::builder().build()
248    }
249
250    /// Create [`ProactorBuilder`] to config the proactor.
251    pub fn builder() -> ProactorBuilder {
252        ProactorBuilder::new()
253    }
254
255    fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
256        Ok(Self {
257            driver: Driver::new(builder)?,
258        })
259    }
260
261    /// The current driver type.
262    pub fn driver_type(&self) -> DriverType {
263        self.driver.driver_type()
264    }
265
266    /// Attach an fd to the driver.
267    ///
268    /// ## Platform specific
269    /// * IOCP: it will be attached to the completion port. An fd could only be
270    ///   attached to one driver, and could only be attached once, even if you
271    ///   `try_clone` it.
272    /// * io-uring & polling: it will do nothing but return `Ok(())`.
273    pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
274        self.driver.attach(fd)
275    }
276
277    /// Cancel an operation with the pushed user-defined data.
278    ///
279    /// The cancellation is not reliable. The underlying operation may continue,
280    /// but just don't return from [`Proactor::poll`]. Therefore, although an
281    /// operation is cancelled, you should not reuse its `user_data`.
282    pub fn cancel<T: OpCode>(&mut self, mut op: Key<T>) -> Option<BufResult<usize, T>> {
283        instrument!(compio_log::Level::DEBUG, "cancel", ?op);
284        if op.set_cancelled() {
285            // SAFETY: completed.
286            Some(unsafe { op.into_inner() })
287        } else {
288            self.driver
289                .cancel(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) });
290            None
291        }
292    }
293
294    /// Push an operation into the driver, and return the unique key, called
295    /// user-defined data, associated with it.
296    pub fn push<T: OpCode + 'static>(&mut self, op: T) -> PushEntry<Key<T>, BufResult<usize, T>> {
297        let mut op = self.driver.create_op(op);
298        match self
299            .driver
300            .push(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) })
301        {
302            Poll::Pending => PushEntry::Pending(op),
303            Poll::Ready(res) => {
304                op.set_result(res);
305                // SAFETY: just completed.
306                PushEntry::Ready(unsafe { op.into_inner() })
307            }
308        }
309    }
310
311    /// Poll the driver and get completed entries.
312    /// You need to call [`Proactor::pop`] to get the pushed
313    /// operations.
314    pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
315        unsafe { self.driver.poll(timeout) }
316    }
317
318    /// Get the pushed operations from the completion entries.
319    ///
320    /// # Panics
321    /// This function will panic if the requested operation has not been
322    /// completed.
323    pub fn pop<T>(&mut self, op: Key<T>) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
324        instrument!(compio_log::Level::DEBUG, "pop", ?op);
325        if op.has_result() {
326            let flags = op.flags();
327            // SAFETY: completed.
328            PushEntry::Ready((unsafe { op.into_inner() }, flags))
329        } else {
330            PushEntry::Pending(op)
331        }
332    }
333
334    /// Update the waker of the specified op.
335    pub fn update_waker<T>(&mut self, op: &mut Key<T>, waker: Waker) {
336        op.set_waker(waker);
337    }
338
339    /// Create a notify handle to interrupt the inner driver.
340    pub fn handle(&self) -> NotifyHandle {
341        self.driver.handle()
342    }
343
344    /// Create buffer pool with given `buffer_size` and `buffer_len`
345    ///
346    /// # Notes
347    ///
348    /// If `buffer_len` is not a power of 2, it will be rounded up with
349    /// [`u16::next_power_of_two`].
350    pub fn create_buffer_pool(
351        &mut self,
352        buffer_len: u16,
353        buffer_size: usize,
354    ) -> io::Result<BufferPool> {
355        self.driver.create_buffer_pool(buffer_len, buffer_size)
356    }
357
358    /// Release the buffer pool
359    ///
360    /// # Safety
361    ///
362    /// Caller must make sure to release the buffer pool with the correct
363    /// driver, i.e., the one they created the buffer pool with.
364    pub unsafe fn release_buffer_pool(&mut self, buffer_pool: BufferPool) -> io::Result<()> {
365        self.driver.release_buffer_pool(buffer_pool)
366    }
367}
368
369impl AsRawFd for Proactor {
370    fn as_raw_fd(&self) -> RawFd {
371        self.driver.as_raw_fd()
372    }
373}
374
375/// An completed entry returned from kernel.
376#[derive(Debug)]
377pub(crate) struct Entry {
378    user_data: usize,
379    result: io::Result<usize>,
380    flags: u32,
381}
382
383impl Entry {
384    pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
385        Self {
386            user_data,
387            result,
388            flags: 0,
389        }
390    }
391
392    #[cfg(io_uring)]
393    // this method only used by in io-uring driver
394    pub(crate) fn set_flags(&mut self, flags: u32) {
395        self.flags = flags;
396    }
397
398    /// The user-defined data returned by [`Proactor::push`].
399    pub fn user_data(&self) -> usize {
400        self.user_data
401    }
402
403    pub fn flags(&self) -> u32 {
404        self.flags
405    }
406
407    /// The result of the operation.
408    pub fn into_result(self) -> io::Result<usize> {
409        self.result
410    }
411
412    /// SAFETY: `user_data` should be a valid pointer.
413    pub unsafe fn notify(self) {
414        let user_data = self.user_data();
415        let mut op = Key::<()>::new_unchecked(user_data);
416        op.set_flags(self.flags());
417        if op.set_result(self.into_result()) {
418            // SAFETY: completed and cancelled.
419            let _ = op.into_box();
420        }
421    }
422}
423
424#[derive(Debug, Clone)]
425enum ThreadPoolBuilder {
426    Create { limit: usize, recv_limit: Duration },
427    Reuse(AsyncifyPool),
428}
429
430impl Default for ThreadPoolBuilder {
431    fn default() -> Self {
432        Self::new()
433    }
434}
435
436impl ThreadPoolBuilder {
437    pub fn new() -> Self {
438        Self::Create {
439            limit: 256,
440            recv_limit: Duration::from_secs(60),
441        }
442    }
443
444    pub fn create_or_reuse(&self) -> AsyncifyPool {
445        match self {
446            Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
447            Self::Reuse(pool) => pool.clone(),
448        }
449    }
450}
451
452/// Builder for [`Proactor`].
453#[derive(Debug, Clone)]
454pub struct ProactorBuilder {
455    capacity: u32,
456    pool_builder: ThreadPoolBuilder,
457    sqpoll_idle: Option<Duration>,
458    coop_taskrun: bool,
459    taskrun_flag: bool,
460    eventfd: Option<RawFd>,
461    driver_type: Option<DriverType>,
462}
463
464// Safety: `RawFd` is thread safe.
465unsafe impl Send for ProactorBuilder {}
466unsafe impl Sync for ProactorBuilder {}
467
468impl Default for ProactorBuilder {
469    fn default() -> Self {
470        Self::new()
471    }
472}
473
474impl ProactorBuilder {
475    /// Create the builder with default config.
476    pub fn new() -> Self {
477        Self {
478            capacity: 1024,
479            pool_builder: ThreadPoolBuilder::new(),
480            sqpoll_idle: None,
481            coop_taskrun: false,
482            taskrun_flag: false,
483            eventfd: None,
484            driver_type: None,
485        }
486    }
487
488    /// Set the capacity of the inner event queue or submission queue, if
489    /// exists. The default value is 1024.
490    pub fn capacity(&mut self, capacity: u32) -> &mut Self {
491        self.capacity = capacity;
492        self
493    }
494
495    /// Set the thread number limit of the inner thread pool, if exists. The
496    /// default value is 256.
497    ///
498    /// It will be ignored if `reuse_thread_pool` is set.
499    ///
500    /// Warning: some operations don't work if the limit is set to zero:
501    /// * `Asyncify` needs thread pool.
502    /// * Operations except `Recv*`, `Send*`, `Connect`, `Accept` may need
503    ///   thread pool.
504    pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
505        if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
506            *limit = value;
507        }
508        self
509    }
510
511    /// Set the waiting timeout of the inner thread, if exists. The default is
512    /// 60 seconds.
513    ///
514    /// It will be ignored if `reuse_thread_pool` is set.
515    pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
516        if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
517            *recv_limit = timeout;
518        }
519        self
520    }
521
522    /// Set to reuse an existing [`AsyncifyPool`] in this proactor.
523    pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
524        self.pool_builder = ThreadPoolBuilder::Reuse(pool);
525        self
526    }
527
528    /// Force reuse the thread pool for each proactor created by this builder,
529    /// even `reuse_thread_pool` is not set.
530    pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
531        self.reuse_thread_pool(self.create_or_get_thread_pool());
532        self
533    }
534
535    /// Create or reuse the thread pool from the config.
536    pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
537        self.pool_builder.create_or_reuse()
538    }
539
540    /// Set `io-uring` sqpoll idle milliseconds, when `sqpoll_idle` is set,
541    /// io-uring sqpoll feature will be enabled
542    ///
543    /// # Notes
544    ///
545    /// - Only effective when the `io-uring` feature is enabled
546    /// - `idle` must >= 1ms, otherwise will set sqpoll idle 0ms
547    /// - `idle` will be rounded down
548    pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
549        self.sqpoll_idle = Some(idle);
550        self
551    }
552
553    /// `coop_taskrun` feature has been available since Linux Kernel 5.19. This
554    /// will optimize performance for most cases, especially compio is a single
555    /// thread runtime.
556    ///
557    /// However, it can't run with sqpoll feature.
558    ///
559    /// # Notes
560    ///
561    /// - Only effective when the `io-uring` feature is enabled
562    pub fn coop_taskrun(&mut self, enable: bool) -> &mut Self {
563        self.coop_taskrun = enable;
564        self
565    }
566
567    /// `taskrun_flag` feature has been available since Linux Kernel 5.19. This
568    /// allows io-uring driver can know if any cqes are available when try to
569    /// push sqe to sq. This should be enabled with
570    /// [`coop_taskrun`](Self::coop_taskrun)
571    ///
572    /// # Notes
573    ///
574    /// - Only effective when the `io-uring` feature is enabled
575    pub fn taskrun_flag(&mut self, enable: bool) -> &mut Self {
576        self.taskrun_flag = enable;
577        self
578    }
579
580    /// Register an eventfd to io-uring.
581    ///
582    /// # Notes
583    ///
584    /// - Only effective when the `io-uring` feature is enabled
585    pub fn register_eventfd(&mut self, fd: RawFd) -> &mut Self {
586        self.eventfd = Some(fd);
587        self
588    }
589
590    /// Force a driver type to use. It is ignored if the fusion driver is
591    /// disabled.
592    pub fn driver_type(&mut self, t: DriverType) -> &mut Self {
593        self.driver_type = Some(t);
594        self
595    }
596
597    /// Build the [`Proactor`].
598    pub fn build(&self) -> io::Result<Proactor> {
599        Proactor::with_builder(self)
600    }
601}