compio_driver/
lib.rs

1//! The platform-specified driver.
2//! Some types differ by compilation target.
3
4#![cfg_attr(docsrs, feature(doc_cfg))]
5#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
6#![warn(missing_docs)]
7
8use std::{
9    io,
10    task::{Poll, Waker},
11    time::Duration,
12};
13
14use compio_buf::BufResult;
15use compio_log::instrument;
16
17mod key;
18pub use key::Key;
19
20pub mod op;
21#[cfg(unix)]
22#[cfg_attr(docsrs, doc(cfg(all())))]
23mod unix;
24#[cfg(unix)]
25use unix::Overlapped;
26
27mod asyncify;
28pub use asyncify::*;
29
30mod fd;
31pub use fd::*;
32
33mod driver_type;
34pub use driver_type::*;
35
36mod buffer_pool;
37pub use buffer_pool::*;
38
39cfg_if::cfg_if! {
40    if #[cfg(windows)] {
41        #[path = "iocp/mod.rs"]
42        mod sys;
43    } else if #[cfg(fusion)] {
44        #[path = "fusion/mod.rs"]
45        mod sys;
46    } else if #[cfg(io_uring)] {
47        #[path = "iour/mod.rs"]
48        mod sys;
49    } else if #[cfg(all(target_os = "linux", not(feature = "polling")))] {
50        #[path = "stub/mod.rs"]
51        mod sys;
52    } else if #[cfg(unix)] {
53        #[path = "poll/mod.rs"]
54        mod sys;
55    }
56}
57
58pub use sys::*;
59
60#[cfg(windows)]
61#[macro_export]
62#[doc(hidden)]
63macro_rules! syscall {
64    (BOOL, $e:expr) => {
65        $crate::syscall!($e, == 0)
66    };
67    (SOCKET, $e:expr) => {
68        $crate::syscall!($e, != 0)
69    };
70    (HANDLE, $e:expr) => {
71        $crate::syscall!($e, == ::windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE)
72    };
73    ($e:expr, $op: tt $rhs: expr) => {{
74        #[allow(unused_unsafe)]
75        let res = unsafe { $e };
76        if res $op $rhs {
77            Err(::std::io::Error::last_os_error())
78        } else {
79            Ok(res)
80        }
81    }};
82}
83
84/// Helper macro to execute a system call
85#[cfg(unix)]
86#[macro_export]
87#[doc(hidden)]
88macro_rules! syscall {
89    (break $e:expr) => {
90        loop {
91            match $crate::syscall!($e) {
92                Ok(fd) => break ::std::task::Poll::Ready(Ok(fd as usize)),
93                Err(e) if e.kind() == ::std::io::ErrorKind::WouldBlock || e.raw_os_error() == Some(::libc::EINPROGRESS)
94                    => break ::std::task::Poll::Pending,
95                Err(e) if e.kind() == ::std::io::ErrorKind::Interrupted => {},
96                Err(e) => break ::std::task::Poll::Ready(Err(e)),
97            }
98        }
99    };
100    ($e:expr, $f:ident($fd:expr)) => {
101        match $crate::syscall!(break $e) {
102            ::std::task::Poll::Pending => Ok($crate::sys::Decision::$f($fd)),
103            ::std::task::Poll::Ready(Ok(res)) => Ok($crate::sys::Decision::Completed(res)),
104            ::std::task::Poll::Ready(Err(e)) => Err(e),
105        }
106    };
107    ($e:expr) => {{
108        #[allow(unused_unsafe)]
109        let res = unsafe { $e };
110        if res == -1 {
111            Err(::std::io::Error::last_os_error())
112        } else {
113            Ok(res)
114        }
115    }};
116}
117
118#[macro_export]
119#[doc(hidden)]
120macro_rules! impl_raw_fd {
121    ($t:ty, $it:ty, $inner:ident) => {
122        impl $crate::AsRawFd for $t {
123            fn as_raw_fd(&self) -> $crate::RawFd {
124                self.$inner.as_raw_fd()
125            }
126        }
127        #[cfg(unix)]
128        impl std::os::fd::AsFd for $t {
129            fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> {
130                self.$inner.as_fd()
131            }
132        }
133        #[cfg(unix)]
134        impl std::os::fd::FromRawFd for $t {
135            unsafe fn from_raw_fd(fd: $crate::RawFd) -> Self {
136                Self {
137                    $inner: unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) },
138                }
139            }
140        }
141        impl $crate::ToSharedFd<$it> for $t {
142            fn to_shared_fd(&self) -> $crate::SharedFd<$it> {
143                self.$inner.to_shared_fd()
144            }
145        }
146    };
147    ($t:ty, $it:ty, $inner:ident,file) => {
148        $crate::impl_raw_fd!($t, $it, $inner);
149        #[cfg(windows)]
150        impl std::os::windows::io::FromRawHandle for $t {
151            unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self {
152                Self {
153                    $inner: unsafe { std::os::windows::io::FromRawHandle::from_raw_handle(handle) },
154                }
155            }
156        }
157        #[cfg(windows)]
158        impl std::os::windows::io::AsHandle for $t {
159            fn as_handle(&self) -> std::os::windows::io::BorrowedHandle {
160                self.$inner.as_handle()
161            }
162        }
163        #[cfg(windows)]
164        impl std::os::windows::io::AsRawHandle for $t {
165            fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
166                self.$inner.as_raw_handle()
167            }
168        }
169    };
170    ($t:ty, $it:ty, $inner:ident,socket) => {
171        $crate::impl_raw_fd!($t, $it, $inner);
172        #[cfg(windows)]
173        impl std::os::windows::io::FromRawSocket for $t {
174            unsafe fn from_raw_socket(sock: std::os::windows::io::RawSocket) -> Self {
175                Self {
176                    $inner: unsafe { std::os::windows::io::FromRawSocket::from_raw_socket(sock) },
177                }
178            }
179        }
180        #[cfg(windows)]
181        impl std::os::windows::io::AsSocket for $t {
182            fn as_socket(&self) -> std::os::windows::io::BorrowedSocket {
183                self.$inner.as_socket()
184            }
185        }
186        #[cfg(windows)]
187        impl std::os::windows::io::AsRawSocket for $t {
188            fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
189                self.$inner.as_raw_socket()
190            }
191        }
192    };
193}
194
195/// The return type of [`Proactor::push`].
196pub enum PushEntry<K, R> {
197    /// The operation is pushed to the submission queue.
198    Pending(K),
199    /// The operation is ready and returns.
200    Ready(R),
201}
202
203impl<K, R> PushEntry<K, R> {
204    /// Get if the current variant is [`PushEntry::Ready`].
205    pub const fn is_ready(&self) -> bool {
206        matches!(self, Self::Ready(_))
207    }
208
209    /// Take the ready variant if exists.
210    pub fn take_ready(self) -> Option<R> {
211        match self {
212            Self::Pending(_) => None,
213            Self::Ready(res) => Some(res),
214        }
215    }
216
217    /// Map the [`PushEntry::Pending`] branch.
218    pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
219        match self {
220            Self::Pending(k) => PushEntry::Pending(f(k)),
221            Self::Ready(r) => PushEntry::Ready(r),
222        }
223    }
224
225    /// Map the [`PushEntry::Ready`] branch.
226    pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
227        match self {
228            Self::Pending(k) => PushEntry::Pending(k),
229            Self::Ready(r) => PushEntry::Ready(f(r)),
230        }
231    }
232}
233
234/// Low-level actions of completion-based IO.
235/// It owns the operations to keep the driver safe.
236pub struct Proactor {
237    driver: Driver,
238}
239
240impl Proactor {
241    /// Create [`Proactor`] with 1024 entries.
242    pub fn new() -> io::Result<Self> {
243        Self::builder().build()
244    }
245
246    /// Create [`ProactorBuilder`] to config the proactor.
247    pub fn builder() -> ProactorBuilder {
248        ProactorBuilder::new()
249    }
250
251    fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
252        Ok(Self {
253            driver: Driver::new(builder)?,
254        })
255    }
256
257    /// The current driver type.
258    pub fn driver_type(&self) -> DriverType {
259        self.driver.driver_type()
260    }
261
262    /// Attach an fd to the driver.
263    ///
264    /// ## Platform specific
265    /// * IOCP: it will be attached to the completion port. An fd could only be
266    ///   attached to one driver, and could only be attached once, even if you
267    ///   `try_clone` it.
268    /// * io-uring & polling: it will do nothing but return `Ok(())`.
269    pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
270        self.driver.attach(fd)
271    }
272
273    /// Cancel an operation with the pushed user-defined data.
274    ///
275    /// The cancellation is not reliable. The underlying operation may continue,
276    /// but just don't return from [`Proactor::poll`]. Therefore, although an
277    /// operation is cancelled, you should not reuse its `user_data`.
278    pub fn cancel<T: OpCode>(&mut self, mut op: Key<T>) -> Option<BufResult<usize, T>> {
279        instrument!(compio_log::Level::DEBUG, "cancel", ?op);
280        if op.set_cancelled() {
281            // SAFETY: completed.
282            Some(unsafe { op.into_inner() })
283        } else {
284            self.driver
285                .cancel(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) });
286            None
287        }
288    }
289
290    /// Push an operation into the driver, and return the unique key, called
291    /// user-defined data, associated with it.
292    pub fn push<T: OpCode + 'static>(&mut self, op: T) -> PushEntry<Key<T>, BufResult<usize, T>> {
293        let mut op = self.driver.create_op(op);
294        match self
295            .driver
296            .push(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) })
297        {
298            Poll::Pending => PushEntry::Pending(op),
299            Poll::Ready(res) => {
300                op.set_result(res);
301                // SAFETY: just completed.
302                PushEntry::Ready(unsafe { op.into_inner() })
303            }
304        }
305    }
306
307    /// Poll the driver and get completed entries.
308    /// You need to call [`Proactor::pop`] to get the pushed
309    /// operations.
310    pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
311        self.driver.poll(timeout)
312    }
313
314    /// Get the pushed operations from the completion entries.
315    ///
316    /// # Panics
317    /// This function will panic if the requested operation has not been
318    /// completed.
319    pub fn pop<T>(&mut self, op: Key<T>) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
320        instrument!(compio_log::Level::DEBUG, "pop", ?op);
321        if op.has_result() {
322            let flags = op.flags();
323            // SAFETY: completed.
324            PushEntry::Ready((unsafe { op.into_inner() }, flags))
325        } else {
326            PushEntry::Pending(op)
327        }
328    }
329
330    /// Update the waker of the specified op.
331    pub fn update_waker<T>(&mut self, op: &mut Key<T>, waker: Waker) {
332        op.set_waker(waker);
333    }
334
335    /// Create a waker to interrupt the inner driver.
336    pub fn waker(&self) -> Waker {
337        self.driver.waker()
338    }
339
340    /// Create buffer pool with given `buffer_size` and `buffer_len`
341    ///
342    /// # Notes
343    ///
344    /// If `buffer_len` is not a power of 2, it will be rounded up with
345    /// [`u16::next_power_of_two`].
346    pub fn create_buffer_pool(
347        &mut self,
348        buffer_len: u16,
349        buffer_size: usize,
350    ) -> io::Result<BufferPool> {
351        self.driver.create_buffer_pool(buffer_len, buffer_size)
352    }
353
354    /// Release the buffer pool
355    ///
356    /// # Safety
357    ///
358    /// Caller must make sure to release the buffer pool with the correct
359    /// driver, i.e., the one they created the buffer pool with.
360    pub unsafe fn release_buffer_pool(&mut self, buffer_pool: BufferPool) -> io::Result<()> {
361        unsafe { self.driver.release_buffer_pool(buffer_pool) }
362    }
363}
364
365impl AsRawFd for Proactor {
366    fn as_raw_fd(&self) -> RawFd {
367        self.driver.as_raw_fd()
368    }
369}
370
371/// An completed entry returned from kernel.
372#[derive(Debug)]
373pub(crate) struct Entry {
374    user_data: usize,
375    result: io::Result<usize>,
376    flags: u32,
377}
378
379impl Entry {
380    pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
381        Self {
382            user_data,
383            result,
384            flags: 0,
385        }
386    }
387
388    #[cfg(io_uring)]
389    // this method only used by in io-uring driver
390    pub(crate) fn set_flags(&mut self, flags: u32) {
391        self.flags = flags;
392    }
393
394    /// The user-defined data returned by [`Proactor::push`].
395    pub fn user_data(&self) -> usize {
396        self.user_data
397    }
398
399    pub fn flags(&self) -> u32 {
400        self.flags
401    }
402
403    /// The result of the operation.
404    pub fn into_result(self) -> io::Result<usize> {
405        self.result
406    }
407
408    /// # Safety
409    /// * `user_data` should be a valid pointer.
410    /// * Should only be called once.
411    pub unsafe fn notify(self) {
412        let user_data = self.user_data();
413        let mut op = unsafe { Key::<()>::new_unchecked(user_data) };
414        op.set_flags(self.flags());
415        if op.set_result(self.into_result()) {
416            // SAFETY: completed and cancelled.
417            let _ = unsafe { op.into_box() };
418        }
419    }
420}
421
422#[derive(Debug, Clone)]
423enum ThreadPoolBuilder {
424    Create { limit: usize, recv_limit: Duration },
425    Reuse(AsyncifyPool),
426}
427
428impl Default for ThreadPoolBuilder {
429    fn default() -> Self {
430        Self::new()
431    }
432}
433
434impl ThreadPoolBuilder {
435    pub fn new() -> Self {
436        Self::Create {
437            limit: 256,
438            recv_limit: Duration::from_secs(60),
439        }
440    }
441
442    pub fn create_or_reuse(&self) -> AsyncifyPool {
443        match self {
444            Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
445            Self::Reuse(pool) => pool.clone(),
446        }
447    }
448}
449
450/// Builder for [`Proactor`].
451#[derive(Debug, Clone)]
452pub struct ProactorBuilder {
453    capacity: u32,
454    pool_builder: ThreadPoolBuilder,
455    sqpoll_idle: Option<Duration>,
456    coop_taskrun: bool,
457    taskrun_flag: bool,
458    eventfd: Option<RawFd>,
459    driver_type: Option<DriverType>,
460}
461
462// SAFETY: `RawFd` is thread safe.
463unsafe impl Send for ProactorBuilder {}
464unsafe impl Sync for ProactorBuilder {}
465
466impl Default for ProactorBuilder {
467    fn default() -> Self {
468        Self::new()
469    }
470}
471
472impl ProactorBuilder {
473    /// Create the builder with default config.
474    pub fn new() -> Self {
475        Self {
476            capacity: 1024,
477            pool_builder: ThreadPoolBuilder::new(),
478            sqpoll_idle: None,
479            coop_taskrun: false,
480            taskrun_flag: false,
481            eventfd: None,
482            driver_type: None,
483        }
484    }
485
486    /// Set the capacity of the inner event queue or submission queue, if
487    /// exists. The default value is 1024.
488    pub fn capacity(&mut self, capacity: u32) -> &mut Self {
489        self.capacity = capacity;
490        self
491    }
492
493    /// Set the thread number limit of the inner thread pool, if exists. The
494    /// default value is 256.
495    ///
496    /// It will be ignored if `reuse_thread_pool` is set.
497    ///
498    /// Warning: some operations don't work if the limit is set to zero:
499    /// * `Asyncify` needs thread pool.
500    /// * Operations except `Recv*`, `Send*`, `Connect`, `Accept` may need
501    ///   thread pool.
502    pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
503        if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
504            *limit = value;
505        }
506        self
507    }
508
509    /// Set the waiting timeout of the inner thread, if exists. The default is
510    /// 60 seconds.
511    ///
512    /// It will be ignored if `reuse_thread_pool` is set.
513    pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
514        if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
515            *recv_limit = timeout;
516        }
517        self
518    }
519
520    /// Set to reuse an existing [`AsyncifyPool`] in this proactor.
521    pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
522        self.pool_builder = ThreadPoolBuilder::Reuse(pool);
523        self
524    }
525
526    /// Force reuse the thread pool for each proactor created by this builder,
527    /// even `reuse_thread_pool` is not set.
528    pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
529        self.reuse_thread_pool(self.create_or_get_thread_pool());
530        self
531    }
532
533    /// Create or reuse the thread pool from the config.
534    pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
535        self.pool_builder.create_or_reuse()
536    }
537
538    /// Set `io-uring` sqpoll idle milliseconds, when `sqpoll_idle` is set,
539    /// io-uring sqpoll feature will be enabled
540    ///
541    /// # Notes
542    ///
543    /// - Only effective when the `io-uring` feature is enabled
544    /// - `idle` must >= 1ms, otherwise will set sqpoll idle 0ms
545    /// - `idle` will be rounded down
546    pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
547        self.sqpoll_idle = Some(idle);
548        self
549    }
550
551    /// `coop_taskrun` feature has been available since Linux Kernel 5.19. This
552    /// will optimize performance for most cases, especially compio is a single
553    /// thread runtime.
554    ///
555    /// However, it can't run with sqpoll feature.
556    ///
557    /// # Notes
558    ///
559    /// - Only effective when the `io-uring` feature is enabled
560    pub fn coop_taskrun(&mut self, enable: bool) -> &mut Self {
561        self.coop_taskrun = enable;
562        self
563    }
564
565    /// `taskrun_flag` feature has been available since Linux Kernel 5.19. This
566    /// allows io-uring driver can know if any cqes are available when try to
567    /// push sqe to sq. This should be enabled with
568    /// [`coop_taskrun`](Self::coop_taskrun)
569    ///
570    /// # Notes
571    ///
572    /// - Only effective when the `io-uring` feature is enabled
573    pub fn taskrun_flag(&mut self, enable: bool) -> &mut Self {
574        self.taskrun_flag = enable;
575        self
576    }
577
578    /// Register an eventfd to io-uring.
579    ///
580    /// # Notes
581    ///
582    /// - Only effective when the `io-uring` feature is enabled
583    pub fn register_eventfd(&mut self, fd: RawFd) -> &mut Self {
584        self.eventfd = Some(fd);
585        self
586    }
587
588    /// Force a driver type to use. It is ignored if the fusion driver is
589    /// disabled.
590    pub fn driver_type(&mut self, t: DriverType) -> &mut Self {
591        self.driver_type = Some(t);
592        self
593    }
594
595    /// Build the [`Proactor`].
596    pub fn build(&self) -> io::Result<Proactor> {
597        Proactor::with_builder(self)
598    }
599}