Skip to main content

compio_driver/
lib.rs

1//! Platform-specific drivers.
2//!
3//! Some types differ by compilation target.
4
5#![cfg_attr(docsrs, feature(doc_cfg))]
6#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
7#![warn(missing_docs)]
8#![deny(rustdoc::broken_intra_doc_links)]
9#![doc(
10    html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
11)]
12#![doc(
13    html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
14)]
15
16use std::{
17    io,
18    task::{Poll, Waker},
19    time::Duration,
20};
21
22use compio_buf::BufResult;
23use compio_log::instrument;
24
25mod macros;
26
27mod key;
28pub use key::Key;
29
30mod asyncify;
31pub use asyncify::*;
32
33pub mod op;
34
35mod fd;
36pub use fd::*;
37
38mod driver_type;
39pub use driver_type::*;
40
41mod buffer_pool;
42pub use buffer_pool::*;
43
44mod sys;
45pub use sys::*;
46
47use crate::key::ErasedKey;
48
49mod sys_slice;
50
51/// The return type of [`Proactor::push`].
52#[derive(Debug)]
53pub enum PushEntry<K, R> {
54    /// The operation is pushed to the submission queue.
55    Pending(K),
56    /// The operation is ready and returns.
57    Ready(R),
58}
59
60impl<K, R> PushEntry<K, R> {
61    /// Get if the current variant is [`PushEntry::Ready`].
62    pub const fn is_ready(&self) -> bool {
63        matches!(self, Self::Ready(_))
64    }
65
66    /// Take the ready variant if exists.
67    pub fn take_ready(self) -> Option<R> {
68        match self {
69            Self::Pending(_) => None,
70            Self::Ready(res) => Some(res),
71        }
72    }
73
74    /// Map the [`PushEntry::Pending`] branch.
75    pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
76        match self {
77            Self::Pending(k) => PushEntry::Pending(f(k)),
78            Self::Ready(r) => PushEntry::Ready(r),
79        }
80    }
81
82    /// Map the [`PushEntry::Ready`] branch.
83    pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
84        match self {
85            Self::Pending(k) => PushEntry::Pending(k),
86            Self::Ready(r) => PushEntry::Ready(f(r)),
87        }
88    }
89}
90
91/// Low-level actions of completion-based IO.
92/// It owns the operations to keep the driver safe.
93pub struct Proactor {
94    driver: Driver,
95}
96
97impl Proactor {
98    /// Create [`Proactor`] with 1024 entries.
99    pub fn new() -> io::Result<Self> {
100        Self::builder().build()
101    }
102
103    /// Create [`ProactorBuilder`] to config the proactor.
104    pub fn builder() -> ProactorBuilder {
105        ProactorBuilder::new()
106    }
107
108    fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
109        Ok(Self {
110            driver: Driver::new(builder)?,
111        })
112    }
113
114    /// Get a default [`Extra`] for underlying driver.
115    pub fn default_extra(&self) -> Extra {
116        self.driver.default_extra().into()
117    }
118
119    /// The current driver type.
120    pub fn driver_type(&self) -> DriverType {
121        self.driver.driver_type()
122    }
123
124    /// Attach an fd to the driver.
125    ///
126    /// ## Platform specific
127    /// * IOCP: it will be attached to the completion port. An fd could only be
128    ///   attached to one driver, and could only be attached once, even if you
129    ///   `try_clone` it.
130    /// * io-uring & polling: it will do nothing but return `Ok(())`.
131    pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
132        self.driver.attach(fd)
133    }
134
135    /// Cancel an operation with the pushed [`Key`].
136    ///
137    /// The cancellation is not reliable. The underlying operation may continue,
138    /// but just don't return from [`Proactor::poll`].
139    pub fn cancel<T: OpCode>(&mut self, key: Key<T>) -> Option<BufResult<usize, T>> {
140        instrument!(compio_log::Level::DEBUG, "cancel", ?key);
141        key.set_cancelled();
142        if key.has_result() {
143            Some(key.take_result())
144        } else {
145            self.driver.cancel(key);
146            None
147        }
148    }
149
150    /// Push an operation into the driver, and return the unique key [`Key`],
151    /// associated with it.
152    pub fn push<T: sys::OpCode + 'static>(
153        &mut self,
154        op: T,
155    ) -> PushEntry<Key<T>, BufResult<usize, T>> {
156        self.push_with_extra(op, self.default_extra())
157    }
158
159    /// Push an operation into the driver with user-defined [`Extra`], and
160    /// return the unique key [`Key`], associated with it.
161    pub fn push_with_extra<T: sys::OpCode + 'static>(
162        &mut self,
163        op: T,
164        extra: Extra,
165    ) -> PushEntry<Key<T>, BufResult<usize, T>> {
166        let key = Key::new(op, extra);
167        match self.driver.push(key.clone().erase()) {
168            Poll::Pending => PushEntry::Pending(key),
169            Poll::Ready(res) => {
170                key.set_result(res);
171                PushEntry::Ready(key.take_result())
172            }
173        }
174    }
175
176    /// Poll the driver and get completed entries.
177    /// You need to call [`Proactor::pop`] to get the pushed
178    /// operations.
179    pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
180        self.driver.poll(timeout)
181    }
182
183    /// Get the pushed operations from the completion entries.
184    ///
185    /// # Panics
186    /// This function will panic if the requested operation has not been
187    /// completed.
188    pub fn pop<T>(&mut self, key: Key<T>) -> PushEntry<Key<T>, BufResult<usize, T>> {
189        instrument!(compio_log::Level::DEBUG, "pop", ?key);
190        if key.has_result() {
191            PushEntry::Ready(key.take_result())
192        } else {
193            PushEntry::Pending(key)
194        }
195    }
196
197    /// Get the pushed operations from the completion entries along the
198    /// [`Extra`] associated.
199    ///
200    /// # Panics
201    /// This function will panic if the requested operation has not been
202    /// completed.
203    pub fn pop_with_extra<T>(
204        &mut self,
205        key: Key<T>,
206    ) -> PushEntry<Key<T>, (BufResult<usize, T>, Extra)> {
207        instrument!(compio_log::Level::DEBUG, "pop", ?key);
208        if key.has_result() {
209            let extra = key.swap_extra(self.default_extra());
210            let res = key.take_result();
211            PushEntry::Ready((res, extra))
212        } else {
213            PushEntry::Pending(key)
214        }
215    }
216
217    /// Update the waker of the specified op.
218    pub fn update_waker<T>(&mut self, op: &mut Key<T>, waker: &Waker) {
219        op.set_waker(waker);
220    }
221
222    /// Create a waker to interrupt the inner driver.
223    pub fn waker(&self) -> Waker {
224        self.driver.waker()
225    }
226
227    /// Create buffer pool with given `buffer_size` and `buffer_len`
228    ///
229    /// # Notes
230    ///
231    /// If `buffer_len` is not a power of 2, it will be rounded up with
232    /// [`u16::next_power_of_two`].
233    pub fn create_buffer_pool(
234        &mut self,
235        buffer_len: u16,
236        buffer_size: usize,
237    ) -> io::Result<BufferPool> {
238        self.driver.create_buffer_pool(buffer_len, buffer_size)
239    }
240
241    /// Release the buffer pool
242    ///
243    /// # Safety
244    ///
245    /// Caller must make sure to release the buffer pool with the correct
246    /// driver, i.e., the one they created the buffer pool with.
247    pub unsafe fn release_buffer_pool(&mut self, buffer_pool: BufferPool) -> io::Result<()> {
248        unsafe { self.driver.release_buffer_pool(buffer_pool) }
249    }
250
251    /// Register a new personality in io-uring driver.
252    ///
253    /// Returns the personality id, which can be used with
254    /// [`Extra::set_personality`] to set the personality for an operation.
255    ///
256    /// This only works on `io_uring` driver. It will return an [`Unsupported`]
257    /// error on other drivers. See [`Submitter::register_personality`] for
258    /// more.
259    ///
260    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
261    /// [`Submitter::register_personality`]: https://docs.rs/io-uring/latest/io_uring/struct.Submitter.html#method.register_personality
262    pub fn register_personality(&self) -> io::Result<u16> {
263        fn unsupported() -> io::Error {
264            io::Error::new(
265                io::ErrorKind::Unsupported,
266                "Personality is only supported on io-uring driver",
267            )
268        }
269
270        #[cfg(io_uring)]
271        match self.driver.as_iour() {
272            Some(iour) => iour.register_personality(),
273            None => Err(unsupported()),
274        }
275
276        #[cfg(not(io_uring))]
277        Err(unsupported())
278    }
279
280    /// Unregister the given personality in io-uring driver.
281    ///
282    /// This only works on `io_uring` driver. It will return an [`Unsupported`]
283    /// error on other drivers. See [`Submitter::unregister_personality`] for
284    /// more.
285    ///
286    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
287    /// [`Submitter::unregister_personality`]: https://docs.rs/io-uring/latest/io_uring/struct.Submitter.html#method.unregister_personality
288    pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
289        fn unsupported(_: u16) -> io::Error {
290            io::Error::new(
291                io::ErrorKind::Unsupported,
292                "Personality is only supported on io-uring driver",
293            )
294        }
295
296        #[cfg(io_uring)]
297        match self.driver.as_iour() {
298            Some(iour) => iour.unregister_personality(personality),
299            None => Err(unsupported(personality)),
300        }
301
302        #[cfg(not(io_uring))]
303        Err(unsupported(personality))
304    }
305}
306
307impl AsRawFd for Proactor {
308    fn as_raw_fd(&self) -> RawFd {
309        self.driver.as_raw_fd()
310    }
311}
312
313/// An completed entry returned from kernel.
314///
315/// This represents the ownership of [`Key`] passed into the kernel is given
316/// back from it to the driver.
317#[derive(Debug)]
318pub(crate) struct Entry {
319    key: ErasedKey,
320    result: io::Result<usize>,
321
322    #[cfg(io_uring)]
323    flags: u32,
324}
325
326unsafe impl Send for Entry {}
327unsafe impl Sync for Entry {}
328
329impl Entry {
330    pub(crate) fn new(key: ErasedKey, result: io::Result<usize>) -> Self {
331        #[cfg(not(io_uring))]
332        {
333            Self { key, result }
334        }
335        #[cfg(io_uring)]
336        {
337            Self {
338                key,
339                result,
340                flags: 0,
341            }
342        }
343    }
344
345    #[allow(dead_code)]
346    pub fn user_data(&self) -> usize {
347        self.key.as_raw()
348    }
349
350    #[allow(dead_code)]
351    pub fn into_key(self) -> ErasedKey {
352        self.key
353    }
354
355    #[cfg(io_uring)]
356    pub fn flags(&self) -> u32 {
357        self.flags
358    }
359
360    #[cfg(io_uring)]
361    // this method only used by in io-uring driver
362    pub(crate) fn set_flags(&mut self, flags: u32) {
363        self.flags = flags;
364    }
365
366    pub fn notify(self) {
367        #[cfg(io_uring)]
368        self.key.borrow().extra_mut().set_flags(self.flags());
369        self.key.set_result(self.result);
370    }
371}
372
373#[derive(Debug, Clone)]
374enum ThreadPoolBuilder {
375    Create { limit: usize, recv_limit: Duration },
376    Reuse(AsyncifyPool),
377}
378
379impl Default for ThreadPoolBuilder {
380    fn default() -> Self {
381        Self::new()
382    }
383}
384
385impl ThreadPoolBuilder {
386    pub fn new() -> Self {
387        Self::Create {
388            limit: 256,
389            recv_limit: Duration::from_secs(60),
390        }
391    }
392
393    pub fn create_or_reuse(&self) -> AsyncifyPool {
394        match self {
395            Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
396            Self::Reuse(pool) => pool.clone(),
397        }
398    }
399}
400
401/// Builder for [`Proactor`].
402#[derive(Debug, Clone)]
403pub struct ProactorBuilder {
404    capacity: u32,
405    pool_builder: ThreadPoolBuilder,
406    sqpoll_idle: Option<Duration>,
407    coop_taskrun: bool,
408    taskrun_flag: bool,
409    eventfd: Option<RawFd>,
410    driver_type: Option<DriverType>,
411}
412
413// SAFETY: `RawFd` is thread safe.
414unsafe impl Send for ProactorBuilder {}
415unsafe impl Sync for ProactorBuilder {}
416
417impl Default for ProactorBuilder {
418    fn default() -> Self {
419        Self::new()
420    }
421}
422
423impl ProactorBuilder {
424    /// Create the builder with default config.
425    pub fn new() -> Self {
426        Self {
427            capacity: 1024,
428            pool_builder: ThreadPoolBuilder::new(),
429            sqpoll_idle: None,
430            coop_taskrun: false,
431            taskrun_flag: false,
432            eventfd: None,
433            driver_type: None,
434        }
435    }
436
437    /// Set the capacity of the inner event queue or submission queue, if
438    /// exists. The default value is 1024.
439    pub fn capacity(&mut self, capacity: u32) -> &mut Self {
440        self.capacity = capacity;
441        self
442    }
443
444    /// Set the thread number limit of the inner thread pool, if exists. The
445    /// default value is 256.
446    ///
447    /// It will be ignored if `reuse_thread_pool` is set.
448    ///
449    /// Warning: some operations don't work if the limit is set to zero:
450    /// * `Asyncify` needs thread pool.
451    /// * Operations except `Recv*`, `Send*`, `Connect`, `Accept` may need
452    ///   thread pool.
453    pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
454        if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
455            *limit = value;
456        }
457        self
458    }
459
460    /// Set the waiting timeout of the inner thread, if exists. The default is
461    /// 60 seconds.
462    ///
463    /// It will be ignored if `reuse_thread_pool` is set.
464    pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
465        if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
466            *recv_limit = timeout;
467        }
468        self
469    }
470
471    /// Set to reuse an existing [`AsyncifyPool`] in this proactor.
472    pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
473        self.pool_builder = ThreadPoolBuilder::Reuse(pool);
474        self
475    }
476
477    /// Force reuse the thread pool for each proactor created by this builder,
478    /// even `reuse_thread_pool` is not set.
479    pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
480        self.reuse_thread_pool(self.create_or_get_thread_pool());
481        self
482    }
483
484    /// Create or reuse the thread pool from the config.
485    pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
486        self.pool_builder.create_or_reuse()
487    }
488
489    /// Set `io-uring` sqpoll idle milliseconds, when `sqpoll_idle` is set,
490    /// io-uring sqpoll feature will be enabled
491    ///
492    /// # Notes
493    ///
494    /// - Only effective when the `io-uring` feature is enabled
495    /// - `idle` must >= 1ms, otherwise will set sqpoll idle 0ms
496    /// - `idle` will be rounded down
497    pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
498        self.sqpoll_idle = Some(idle);
499        self
500    }
501
502    /// `coop_taskrun` feature has been available since Linux Kernel 5.19. This
503    /// will optimize performance for most cases, especially compio is a single
504    /// thread runtime.
505    ///
506    /// However, it can't run with sqpoll feature.
507    ///
508    /// # Notes
509    ///
510    /// - Only effective when the `io-uring` feature is enabled
511    pub fn coop_taskrun(&mut self, enable: bool) -> &mut Self {
512        self.coop_taskrun = enable;
513        self
514    }
515
516    /// `taskrun_flag` feature has been available since Linux Kernel 5.19. This
517    /// allows io-uring driver can know if any cqes are available when try to
518    /// push sqe to sq. This should be enabled with
519    /// [`coop_taskrun`](Self::coop_taskrun)
520    ///
521    /// # Notes
522    ///
523    /// - Only effective when the `io-uring` feature is enabled
524    pub fn taskrun_flag(&mut self, enable: bool) -> &mut Self {
525        self.taskrun_flag = enable;
526        self
527    }
528
529    /// Register an eventfd to io-uring.
530    ///
531    /// # Notes
532    ///
533    /// - Only effective when the `io-uring` feature is enabled
534    pub fn register_eventfd(&mut self, fd: RawFd) -> &mut Self {
535        self.eventfd = Some(fd);
536        self
537    }
538
539    /// Force a driver type to use. It is ignored if the fusion driver is
540    /// disabled.
541    pub fn driver_type(&mut self, t: DriverType) -> &mut Self {
542        self.driver_type = Some(t);
543        self
544    }
545
546    /// Build the [`Proactor`].
547    pub fn build(&self) -> io::Result<Proactor> {
548        Proactor::with_builder(self)
549    }
550}