Skip to main content

compio_driver/
lib.rs

1//! Platform-specific drivers.
2//!
3//! Some types differ by compilation target.
4
5#![cfg_attr(docsrs, feature(doc_cfg))]
6#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
7#![allow(unused_features)]
8#![warn(missing_docs)]
9#![deny(rustdoc::broken_intra_doc_links)]
10#![doc(
11    html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
12)]
13#![doc(
14    html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
15)]
16
17use std::{
18    io,
19    num::NonZero,
20    task::{Poll, Waker},
21    time::Duration,
22};
23
24use compio_buf::BufResult;
25use compio_log::instrument;
26
27mod control;
28mod macros;
29mod panic;
30
31mod key;
32pub use key::Key;
33
34mod asyncify;
35pub use asyncify::*;
36
37mod fd;
38pub use fd::*;
39
40mod driver_type;
41pub use driver_type::*;
42
43mod sys;
44pub use sys::{op, *};
45
46mod cancel;
47pub use cancel::*;
48
49mod buffer_pool;
50pub use buffer_pool::{BoxAllocator, BufferAllocator, BufferPool, BufferRef};
51
52use crate::{
53    buffer_pool::{BufferAlloc, BufferPoolRoot},
54    key::ErasedKey,
55    panic::resume_unwind_io,
56    sys::op::OpCodeFlag,
57};
58
59/// The return type of [`Proactor::push`].
60#[derive(Debug)]
61pub enum PushEntry<K, R> {
62    /// The operation is pushed to the submission queue.
63    Pending(K),
64    /// The operation is ready and returns.
65    Ready(R),
66}
67
68impl<K, R> PushEntry<K, R> {
69    /// Get if the current variant is [`PushEntry::Ready`].
70    pub const fn is_ready(&self) -> bool {
71        matches!(self, Self::Ready(_))
72    }
73
74    /// Take the ready variant if exists.
75    pub fn take_ready(self) -> Option<R> {
76        match self {
77            Self::Pending(_) => None,
78            Self::Ready(res) => Some(res),
79        }
80    }
81
82    /// Map the [`PushEntry::Pending`] branch.
83    pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
84        match self {
85            Self::Pending(k) => PushEntry::Pending(f(k)),
86            Self::Ready(r) => PushEntry::Ready(r),
87        }
88    }
89
90    /// Map the [`PushEntry::Ready`] branch.
91    pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
92        match self {
93            Self::Pending(k) => PushEntry::Pending(k),
94            Self::Ready(r) => PushEntry::Ready(f(r)),
95        }
96    }
97}
98
99/// Low-level actions of completion-based IO.
100/// It owns the operations to keep the driver safe.
101pub struct Proactor {
102    driver: Driver,
103    buffer_pool: BufferPoolState,
104}
105
106enum BufferPoolState {
107    Uninit {
108        allocator: BufferAlloc,
109        num_of_bufs: u16,
110        buffer_len: usize,
111        flags: u16,
112    },
113    Init(BufferPoolRoot),
114}
115
116impl BufferPoolState {
117    fn get(&mut self, driver: &mut Driver) -> io::Result<BufferPool> {
118        loop {
119            match self {
120                BufferPoolState::Uninit {
121                    allocator,
122                    num_of_bufs,
123                    buffer_len,
124                    flags,
125                } => {
126                    *self = BufferPoolState::Init(BufferPoolRoot::new(
127                        driver,
128                        *allocator,
129                        *num_of_bufs,
130                        *buffer_len,
131                        *flags,
132                    )?);
133                }
134                BufferPoolState::Init(root) => return Ok(root.get_pool()),
135            }
136        }
137    }
138}
139
140impl Drop for Proactor {
141    fn drop(&mut self) {
142        let BufferPoolState::Init(buffer_pool) = &mut self.buffer_pool else {
143            return;
144        };
145        debug_assert!(buffer_pool.is_unique()); // Just in case. Shouldn't happen
146        _ = unsafe { buffer_pool.release(&mut self.driver) };
147    }
148}
149
150assert_not_impl!(Proactor, Send);
151assert_not_impl!(Proactor, Sync);
152
153impl Proactor {
154    /// Create [`Proactor`] with 1024 entries.
155    pub fn new() -> io::Result<Self> {
156        Self::builder().build()
157    }
158
159    /// Create [`ProactorBuilder`] to config the proactor.
160    pub fn builder() -> ProactorBuilder {
161        ProactorBuilder::new()
162    }
163
164    fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
165        Ok(Self {
166            driver: Driver::new(builder)?,
167            buffer_pool: BufferPoolState::Uninit {
168                allocator: builder.buffer_pool_allocator,
169                num_of_bufs: builder.buffer_pool_size,
170                buffer_len: builder.buffer_pool_buffer_len,
171                flags: builder.buffer_pool_flag,
172            },
173        })
174    }
175
176    /// Get a default [`Extra`] for underlying driver.
177    pub fn default_extra(&self) -> Extra {
178        Extra::new(&self.driver)
179    }
180
181    /// The current driver type.
182    pub fn driver_type(&self) -> DriverType {
183        self.driver.driver_type()
184    }
185
186    /// Attach an fd to the driver.
187    ///
188    /// ## Platform specific
189    /// * IOCP: it will be attached to the completion port. An fd could only be
190    ///   attached to one driver, and could only be attached once, even if you
191    ///   `try_clone` it.
192    /// * io-uring & polling: it will do nothing but return `Ok(())`.
193    pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
194        self.driver.attach(fd)
195    }
196
197    /// Cancel an operation with the pushed [`Key`].
198    ///
199    /// Returns the result if the key is unique and the operation is completed.
200    ///
201    /// The cancellation is not reliable. The underlying operation may continue,
202    /// but just don't return from [`Proactor::poll`].
203    pub fn cancel<T: OpCode>(&mut self, key: Key<T>) -> Option<BufResult<usize, T>> {
204        instrument!(compio_log::Level::DEBUG, "cancel", ?key);
205        if key.set_cancelled() {
206            return None;
207        }
208        if key.is_unique() && key.has_result() {
209            let (res, buf) = key.take_result().into_parts();
210            Some(BufResult(resume_unwind_io(res), buf))
211        } else {
212            self.driver.cancel(key.erase());
213            None
214        }
215    }
216
217    /// Cancel an operation with a [`Cancel`] token.
218    ///
219    /// Returns if a cancellation has been issued.
220    ///
221    /// The cancellation is not reliable. The underlying operation may continue,
222    /// but just don't return from [`Proactor::pop`]. This will do nothing if
223    /// the operation has already been completed or cancelled before.
224    pub fn cancel_token(&mut self, token: Cancel) -> bool {
225        instrument!(compio_log::Level::DEBUG, "cancel_token", ?token);
226
227        let Some(key) = token.upgrade() else {
228            return false;
229        };
230        if key.set_cancelled() || key.has_result() {
231            return false;
232        }
233        self.driver.cancel(key);
234        true
235    }
236
237    /// Create a [`Cancel`] that can be used to cancel the operation even
238    /// without the key.
239    ///
240    /// This acts like a weak reference to the [`Key`], but can only be used to
241    /// cancel the operation with [`Proactor::cancel_token`]. Extra copy of
242    /// [`Key`] may cause [`Proactor::pop`] to panic while keys registered
243    /// as [`Cancel`] will not. So this is useful in cases where you're not sure
244    /// if the operation will be cancelled.
245    pub fn register_cancel<T: OpCode>(&mut self, key: &Key<T>) -> Cancel {
246        Cancel::new(key)
247    }
248
249    /// Push an operation into the driver, and return the unique key [`Key`],
250    /// associated with it.
251    pub fn push<T: sys::OpCode + 'static>(
252        &mut self,
253        op: T,
254    ) -> PushEntry<Key<T>, BufResult<usize, T>> {
255        self.push_with_extra(op, self.default_extra())
256    }
257
258    /// Push an operation into the driver with user-defined [`Extra`], and
259    /// return the unique key [`Key`], associated with it.
260    pub fn push_with_extra<T: sys::OpCode + 'static>(
261        &mut self,
262        op: T,
263        extra: Extra,
264    ) -> PushEntry<Key<T>, BufResult<usize, T>> {
265        let key = Key::new(op, extra, self.driver_type());
266        match self.driver.push(key.clone().erase()) {
267            Poll::Pending => PushEntry::Pending(key),
268            Poll::Ready(res) => {
269                key.set_result(res);
270                PushEntry::Ready(key.take_result())
271            }
272        }
273    }
274
275    /// Poll the driver and get completed entries.
276    /// You need to call [`Proactor::pop`] to get the pushed
277    /// operations.
278    pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
279        self.driver.poll(timeout)
280    }
281
282    /// Get the pushed operations from the completion entries.
283    ///
284    /// # Panics
285    ///
286    /// This function will panic if the [`Key`] is not unique or if the
287    /// operation is blocking and it panicked in the thread pool.
288    pub fn pop<T: OpCode>(&mut self, key: Key<T>) -> PushEntry<Key<T>, BufResult<usize, T>> {
289        instrument!(compio_log::Level::DEBUG, "pop", ?key);
290        if key.has_result() {
291            let (res, buf) = key.take_result().into_parts();
292            PushEntry::Ready(BufResult(resume_unwind_io(res), buf))
293        } else {
294            PushEntry::Pending(key)
295        }
296    }
297
298    /// Get the pushed operations from the completion entries along the
299    /// [`Extra`] associated.
300    ///
301    /// # Panics
302    ///
303    /// This function will panic if the [`Key`] is not unique or if the
304    /// operation is blocking and it panicked in the thread pool.
305    pub fn pop_with_extra<T: OpCode>(
306        &mut self,
307        key: Key<T>,
308    ) -> PushEntry<Key<T>, (BufResult<usize, T>, Extra)> {
309        instrument!(compio_log::Level::DEBUG, "pop", ?key);
310        if key.has_result() {
311            let extra = key.swap_extra(self.default_extra());
312            let (res, buf) = key.take_result().into_parts();
313            PushEntry::Ready((BufResult(resume_unwind_io(res), buf), extra))
314        } else {
315            PushEntry::Pending(key)
316        }
317    }
318
319    /// Get one completion entry for a multishot operation. If it returns
320    /// [`None`], the user should call [`Proactor::pop_with_extra`] to get the
321    /// final result of the operation.
322    pub fn pop_multishot<T: OpCode>(&mut self, key: &Key<T>) -> Option<BufResult<usize, Extra>> {
323        instrument!(compio_log::Level::DEBUG, "pop_multishot", ?key);
324        self.driver.pop_multishot(key)
325    }
326
327    /// Update the waker of the specified op.
328    pub fn update_waker<T>(&mut self, op: &Key<T>, waker: &Waker) {
329        op.set_waker(waker);
330    }
331
332    /// Create a waker to interrupt the inner driver.
333    pub fn waker(&self) -> Waker {
334        self.driver.waker()
335    }
336
337    /// Register file descriptors for fixed-file operations with io_uring.
338    ///
339    /// This only works on `io_uring` driver. It will return an [`Unsupported`]
340    /// error on other drivers.
341    ///
342    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
343    pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
344        fn unsupported(_: &[RawFd]) -> io::Error {
345            io::Error::new(
346                io::ErrorKind::Unsupported,
347                "Fixed-file registration is only supported on io-uring driver",
348            )
349        }
350
351        #[cfg(io_uring)]
352        match self.driver.as_iour() {
353            Some(iour) => iour.register_files(fds),
354            None => Err(unsupported(fds)),
355        }
356
357        #[cfg(not(io_uring))]
358        Err(unsupported(fds))
359    }
360
361    /// Unregister previously registered file descriptors.
362    ///
363    /// This only works on `io_uring` driver. It will return an [`Unsupported`]
364    /// error on other drivers.
365    ///
366    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
367    pub fn unregister_files(&self) -> io::Result<()> {
368        fn unsupported() -> io::Error {
369            io::Error::new(
370                io::ErrorKind::Unsupported,
371                "Fixed-file unregistration is only supported on io-uring driver",
372            )
373        }
374
375        #[cfg(io_uring)]
376        match self.driver.as_iour() {
377            Some(iour) => iour.unregister_files(),
378            None => Err(unsupported()),
379        }
380
381        #[cfg(not(io_uring))]
382        Err(unsupported())
383    }
384
385    /// Register a new personality in io-uring driver.
386    ///
387    /// Returns the personality id, which can be used with
388    /// [`Extra::set_personality`] to set the personality for an operation.
389    ///
390    /// This only works on `io_uring` driver. It will return an [`Unsupported`]
391    /// error on other drivers. See [`Submitter::register_personality`] for
392    /// more.
393    ///
394    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
395    /// [`Submitter::register_personality`]: https://docs.rs/io-uring/latest/io_uring/struct.Submitter.html#method.register_personality
396    pub fn register_personality(&self) -> io::Result<u16> {
397        fn unsupported() -> io::Error {
398            io::Error::new(
399                io::ErrorKind::Unsupported,
400                "Personality is only supported on io-uring driver",
401            )
402        }
403
404        #[cfg(io_uring)]
405        match self.driver.as_iour() {
406            Some(iour) => iour.register_personality(),
407            None => Err(unsupported()),
408        }
409
410        #[cfg(not(io_uring))]
411        Err(unsupported())
412    }
413
414    /// Unregister the given personality in io-uring driver.
415    ///
416    /// This only works on `io_uring` driver. It will return an [`Unsupported`]
417    /// error on other drivers. See [`Submitter::unregister_personality`] for
418    /// more.
419    ///
420    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
421    /// [`Submitter::unregister_personality`]: https://docs.rs/io-uring/latest/io_uring/struct.Submitter.html#method.unregister_personality
422    pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
423        fn unsupported(_: u16) -> io::Error {
424            io::Error::new(
425                io::ErrorKind::Unsupported,
426                "Personality is only supported on io-uring driver",
427            )
428        }
429
430        #[cfg(io_uring)]
431        match self.driver.as_iour() {
432            Some(iour) => iour.unregister_personality(personality),
433            None => Err(unsupported(personality)),
434        }
435
436        #[cfg(not(io_uring))]
437        Err(unsupported(personality))
438    }
439
440    /// Get the buffer pool of the driver.
441    ///
442    /// This will lazily initialize the pool at the first time it's accessed,
443    /// and future access to the pool will be cheap and infallible.
444    pub fn buffer_pool(&mut self) -> io::Result<BufferPool> {
445        self.buffer_pool.get(&mut self.driver)
446    }
447}
448
449impl AsRawFd for Proactor {
450    fn as_raw_fd(&self) -> RawFd {
451        self.driver.as_raw_fd()
452    }
453}
454
455/// An completed entry returned from kernel.
456///
457/// This represents the ownership of [`Key`] passed into the kernel is given
458/// back from it to the driver.
459#[derive(Debug)]
460pub(crate) struct Entry {
461    key: ErasedKey,
462    result: io::Result<usize>,
463
464    #[cfg(io_uring)]
465    flags: u32,
466}
467
468unsafe impl Send for Entry {}
469unsafe impl Sync for Entry {}
470
471impl Entry {
472    pub(crate) fn new(key: ErasedKey, result: io::Result<usize>) -> Self {
473        #[cfg(not(io_uring))]
474        {
475            Self { key, result }
476        }
477        #[cfg(io_uring)]
478        {
479            Self {
480                key,
481                result,
482                flags: 0,
483            }
484        }
485    }
486
487    #[allow(dead_code)]
488    pub fn user_data(&self) -> usize {
489        self.key.as_raw()
490    }
491
492    #[allow(dead_code)]
493    pub fn into_key(self) -> ErasedKey {
494        self.key
495    }
496
497    #[cfg(io_uring)]
498    pub fn flags(&self) -> u32 {
499        self.flags
500    }
501
502    #[cfg(io_uring)]
503    pub(crate) fn set_flags(&mut self, flags: u32) {
504        self.flags = flags;
505    }
506
507    pub fn notify(self) {
508        #[cfg(io_uring)]
509        self.key.borrow().extra_mut().set_flags(self.flags());
510        self.key.set_result(self.result);
511    }
512}
513
514#[derive(Debug, Clone)]
515enum ThreadPoolBuilder {
516    Create { limit: usize, recv_limit: Duration },
517    Reuse(AsyncifyPool),
518}
519
520impl Default for ThreadPoolBuilder {
521    fn default() -> Self {
522        Self::new()
523    }
524}
525
526impl ThreadPoolBuilder {
527    pub fn new() -> Self {
528        Self::Create {
529            limit: 256,
530            recv_limit: Duration::from_secs(60),
531        }
532    }
533
534    pub fn create_or_reuse(&self) -> AsyncifyPool {
535        match self {
536            Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
537            Self::Reuse(pool) => pool.clone(),
538        }
539    }
540}
541
542/// Builder for [`Proactor`].
543#[derive(Debug, Clone)]
544pub struct ProactorBuilder {
545    capacity: u32,
546    pool_builder: ThreadPoolBuilder,
547    sqpoll_idle: Option<Duration>,
548    cqsize: Option<u32>,
549    coop_taskrun: bool,
550    taskrun_flag: bool,
551    eventfd: Option<RawFd>,
552    driver_type: Option<DriverType>,
553    op_flags: OpCodeFlag,
554    buffer_pool_size: u16,
555    buffer_pool_flag: u16,
556    buffer_pool_buffer_len: usize,
557    buffer_pool_allocator: BufferAlloc,
558}
559
560// SAFETY: `RawFd` is thread safe.
561unsafe impl Send for ProactorBuilder {}
562unsafe impl Sync for ProactorBuilder {}
563
564impl Default for ProactorBuilder {
565    fn default() -> Self {
566        Self::new()
567    }
568}
569
570impl ProactorBuilder {
571    /// Create the builder with default config.
572    pub fn new() -> Self {
573        Self {
574            capacity: 1024,
575            pool_builder: ThreadPoolBuilder::new(),
576            sqpoll_idle: None,
577            cqsize: None,
578            coop_taskrun: false,
579            taskrun_flag: false,
580            eventfd: None,
581            driver_type: None,
582            op_flags: OpCodeFlag::empty(),
583            buffer_pool_size: 8,
584            buffer_pool_flag: 0,
585            buffer_pool_buffer_len: 8192,
586            buffer_pool_allocator: BufferAlloc::new::<BoxAllocator>(),
587        }
588    }
589
590    /// Set the capacity of the inner event queue or submission queue, if
591    /// exists. The default value is 1024.
592    pub fn capacity(&mut self, capacity: u32) -> &mut Self {
593        self.capacity = capacity;
594        self
595    }
596
597    /// Set the completion queue size of io-uring driver. The value should be
598    /// greater than `capacity`.
599    pub fn cqsize(&mut self, cqsize: u32) -> &mut Self {
600        self.cqsize = Some(cqsize);
601        self
602    }
603
604    /// Set the thread number limit of the inner thread pool, if exists. The
605    /// default value is 256.
606    ///
607    /// It will be ignored if `reuse_thread_pool` is set.
608    ///
609    /// Warning: some operations don't work if the limit is set to zero:
610    /// * `Asyncify` needs thread pool.
611    /// * Operations except `Recv*`, `Send*`, `Connect`, `Accept` may need
612    ///   thread pool.
613    pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
614        if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
615            *limit = value;
616        }
617        self
618    }
619
620    /// Set the waiting timeout of the inner thread, if exists. The default is
621    /// 60 seconds.
622    ///
623    /// It will be ignored if `reuse_thread_pool` is set.
624    pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
625        if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
626            *recv_limit = timeout;
627        }
628        self
629    }
630
631    /// Set to reuse an existing [`AsyncifyPool`] in this proactor.
632    pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
633        self.pool_builder = ThreadPoolBuilder::Reuse(pool);
634        self
635    }
636
637    /// Force reuse the thread pool for each proactor created by this builder,
638    /// even `reuse_thread_pool` is not set.
639    pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
640        self.reuse_thread_pool(self.create_or_get_thread_pool());
641        self
642    }
643
644    /// Create or reuse the thread pool from the config.
645    pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
646        self.pool_builder.create_or_reuse()
647    }
648
649    /// Set `io-uring` sqpoll idle duration,
650    ///
651    /// This will also enable io-uring's sqpoll feature.
652    ///
653    /// # Notes
654    ///
655    /// - Only effective when the `io-uring` feature is enabled
656    /// - `idle` must be >= 1ms, otherwise sqpoll idle will be set to 0 ms
657    /// - `idle` will be rounded down
658    pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
659        self.sqpoll_idle = Some(idle);
660        self
661    }
662
663    /// Optimize performance for most cases, especially compio is a single
664    /// thread runtime.
665    ///
666    /// However, it can't run with sqpoll feature.
667    ///
668    /// # Notes
669    ///
670    /// - Available since Linux Kernel 5.19.
671    /// - Only effective when the `io-uring` feature is enabled
672    pub fn coop_taskrun(&mut self, enable: bool) -> &mut Self {
673        self.coop_taskrun = enable;
674        self
675    }
676
677    /// Allows io-uring driver to know if any cqe's are available when try to
678    /// push an sqe to the submission queue.
679    ///
680    /// This should be enabled with [`coop_taskrun`](Self::coop_taskrun)
681    ///
682    /// # Notes
683    ///
684    /// - Available since Linux Kernel 5.19.
685    /// - Only effective when the `io-uring` feature is enabled
686    pub fn taskrun_flag(&mut self, enable: bool) -> &mut Self {
687        self.taskrun_flag = enable;
688        self
689    }
690
691    /// Register an eventfd to io-uring.
692    ///
693    /// # Notes
694    ///
695    /// - Only effective when the `io-uring` feature is enabled
696    pub fn register_eventfd(&mut self, fd: RawFd) -> &mut Self {
697        self.eventfd = Some(fd);
698        self
699    }
700
701    /// Set which io-uring [`OpCode`] must be supported by the driver.
702    ///
703    /// Support for io-uring opcodes varies by kernel version. Setting this
704    /// will force the driver to check for support of the specified opcodes, and
705    /// when any of them are not supported:
706    ///
707    /// - Fallback to `polling` driver if it is enabled, or
708    /// - Return an [`Unsupported`] error when building the proactor otherwise.
709    ///
710    /// # Notes
711    ///
712    /// - Only effective when the `io-uring` feature is enabled
713    /// - [`OpCodeFlag`] is a bitflag struct, you can combine multiple opcodes
714    ///   with bitwise OR or use [`OpCodeFlag::all`] to require all opcodes to
715    ///   be supported.
716    ///
717    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
718    pub fn detect_opcode_support(&mut self, flags: OpCodeFlag) -> &mut Self {
719        self.op_flags = flags;
720        self
721    }
722
723    /// Force a driver type to use.
724    ///
725    /// It is ignored if the fusion driver is disabled.
726    pub fn driver_type(&mut self, t: DriverType) -> &mut Self {
727        self.driver_type = Some(t);
728        self
729    }
730
731    /// Number of buffers in the buffer pool.
732    ///
733    /// `size` will be rounded up if it's not power of 2.
734    ///
735    /// Default to be `8`.
736    pub fn buffer_pool_size(&mut self, size: NonZero<u16>) -> &mut Self {
737        self.buffer_pool_size = size.get();
738        self
739    }
740
741    /// Flag to be used to initialize buffer pool.
742    ///
743    /// This is only supported on io-uring driver.
744    ///
745    /// Default to be `0`.
746    pub fn buffer_pool_flag(&mut self, flag: u16) -> &mut Self {
747        self.buffer_pool_flag = flag;
748        self
749    }
750
751    /// Length of each buffer pool's buffer.
752    ///
753    /// Default to be `8192`.
754    pub fn buffer_pool_buffer_len(&mut self, size: usize) -> &mut Self {
755        self.buffer_pool_buffer_len = size;
756        self
757    }
758
759    /// Set the allocator for buffer pool.
760    ///
761    /// This is different from the std's unstable `Allocator` trait: it's purely
762    /// static and doesn't take an instance at all. This means implementation
763    /// should be global (e.g., `Global`, `malloc` or `mmap`).
764    ///
765    /// Default to [`BoxAllocator`].
766    ///
767    /// # Note
768    ///
769    /// Default allocator performs [very poor] when using managed i/o on Zen 3,
770    /// possibly due to [a bug related to FSRM]. If you observe such a problem,
771    /// try swap the allocator to a mmap-based one may solve it.
772    ///
773    /// [very poor]: https://github.com/compio-rs/compio/issues/472
774    /// [a bug related to FSRM]: https://bugs.launchpad.net/ubuntu/+source/glibc/+bug/2030515
775    pub fn buffer_pool_allocator<A: BufferAllocator>(&mut self) -> &mut Self {
776        self.buffer_pool_allocator = BufferAlloc::new::<A>();
777        self
778    }
779
780    /// Build the [`Proactor`].
781    pub fn build(&self) -> io::Result<Proactor> {
782        Proactor::with_builder(self)
783    }
784}
785
786mod seal {
787    use std::io;
788
789    use compio_buf::BufResult;
790
791    pub(crate) trait Seal {}
792
793    impl Seal for io::Error {}
794    impl<T> Seal for io::Result<T> {}
795    impl<T, B> Seal for BufResult<T, B> {}
796}
797
798/// Extension trait for [`io::Error`] and results with it.
799#[allow(private_bounds)]
800pub trait ErrorExt: seal::Seal {
801    #[doc(hidden)]
802    fn as_io_error(&self) -> Option<&io::Error>;
803
804    /// Whether the error or result is cancelled.
805    fn is_cancelled(&self) -> bool {
806        #[cfg(unix)]
807        const CANCEL_ERROR: i32 = libc::ECANCELED;
808        #[cfg(windows)]
809        const CANCEL_ERROR: i32 = windows_sys::Win32::Foundation::ERROR_OPERATION_ABORTED as _;
810
811        self.as_io_error()
812            .and_then(io::Error::raw_os_error)
813            .is_some_and(|e| e == CANCEL_ERROR)
814    }
815}
816
817impl ErrorExt for io::Error {
818    fn as_io_error(&self) -> Option<&io::Error> {
819        Some(self)
820    }
821}
822
823impl<T> ErrorExt for io::Result<T> {
824    fn as_io_error(&self) -> Option<&io::Error> {
825        self.as_ref().err()
826    }
827}
828
829impl<T, B> ErrorExt for BufResult<T, B> {
830    fn as_io_error(&self) -> Option<&io::Error> {
831        self.0.as_io_error()
832    }
833}