1#![cfg_attr(docsrs, feature(doc_cfg))]
6#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
7#![allow(unused_features)]
8#![warn(missing_docs)]
9#![deny(rustdoc::broken_intra_doc_links)]
10#![doc(
11 html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
12)]
13#![doc(
14 html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
15)]
16
17use std::{
18 io,
19 num::NonZero,
20 task::{Poll, Waker},
21 time::Duration,
22};
23
24use compio_buf::BufResult;
25use compio_log::instrument;
26
27mod control;
28mod macros;
29mod panic;
30
31mod key;
32pub use key::Key;
33
34mod asyncify;
35pub use asyncify::*;
36
37mod fd;
38pub use fd::*;
39
40mod driver_type;
41pub use driver_type::*;
42
43mod sys;
44pub use sys::{op, *};
45
46mod cancel;
47pub use cancel::*;
48
49mod buffer_pool;
50pub use buffer_pool::{BoxAllocator, BufferAllocator, BufferPool, BufferRef};
51
52use crate::{
53 buffer_pool::{BufferAlloc, BufferPoolRoot},
54 key::ErasedKey,
55 panic::resume_unwind_io,
56 sys::op::OpCodeFlag,
57};
58
59#[derive(Debug)]
61pub enum PushEntry<K, R> {
62 Pending(K),
64 Ready(R),
66}
67
68impl<K, R> PushEntry<K, R> {
69 pub const fn is_ready(&self) -> bool {
71 matches!(self, Self::Ready(_))
72 }
73
74 pub fn take_ready(self) -> Option<R> {
76 match self {
77 Self::Pending(_) => None,
78 Self::Ready(res) => Some(res),
79 }
80 }
81
82 pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
84 match self {
85 Self::Pending(k) => PushEntry::Pending(f(k)),
86 Self::Ready(r) => PushEntry::Ready(r),
87 }
88 }
89
90 pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
92 match self {
93 Self::Pending(k) => PushEntry::Pending(k),
94 Self::Ready(r) => PushEntry::Ready(f(r)),
95 }
96 }
97}
98
99pub struct Proactor {
102 driver: Driver,
103 buffer_pool: BufferPoolState,
104}
105
106enum BufferPoolState {
107 Uninit {
108 allocator: BufferAlloc,
109 num_of_bufs: u16,
110 buffer_len: usize,
111 flags: u16,
112 },
113 Init(BufferPoolRoot),
114}
115
116impl BufferPoolState {
117 fn get(&mut self, driver: &mut Driver) -> io::Result<BufferPool> {
118 loop {
119 match self {
120 BufferPoolState::Uninit {
121 allocator,
122 num_of_bufs,
123 buffer_len,
124 flags,
125 } => {
126 *self = BufferPoolState::Init(BufferPoolRoot::new(
127 driver,
128 *allocator,
129 *num_of_bufs,
130 *buffer_len,
131 *flags,
132 )?);
133 }
134 BufferPoolState::Init(root) => return Ok(root.get_pool()),
135 }
136 }
137 }
138}
139
140impl Drop for Proactor {
141 fn drop(&mut self) {
142 let BufferPoolState::Init(buffer_pool) = &mut self.buffer_pool else {
143 return;
144 };
145 debug_assert!(buffer_pool.is_unique()); _ = unsafe { buffer_pool.release(&mut self.driver) };
147 }
148}
149
150assert_not_impl!(Proactor, Send);
151assert_not_impl!(Proactor, Sync);
152
153impl Proactor {
154 pub fn new() -> io::Result<Self> {
156 Self::builder().build()
157 }
158
159 pub fn builder() -> ProactorBuilder {
161 ProactorBuilder::new()
162 }
163
164 fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
165 Ok(Self {
166 driver: Driver::new(builder)?,
167 buffer_pool: BufferPoolState::Uninit {
168 allocator: builder.buffer_pool_allocator,
169 num_of_bufs: builder.buffer_pool_size,
170 buffer_len: builder.buffer_pool_buffer_len,
171 flags: builder.buffer_pool_flag,
172 },
173 })
174 }
175
176 pub fn default_extra(&self) -> Extra {
178 Extra::new(&self.driver)
179 }
180
181 pub fn driver_type(&self) -> DriverType {
183 self.driver.driver_type()
184 }
185
186 pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
194 self.driver.attach(fd)
195 }
196
197 pub fn cancel<T: OpCode>(&mut self, key: Key<T>) -> Option<BufResult<usize, T>> {
204 instrument!(compio_log::Level::DEBUG, "cancel", ?key);
205 if key.set_cancelled() {
206 return None;
207 }
208 if key.is_unique() && key.has_result() {
209 let (res, buf) = key.take_result().into_parts();
210 Some(BufResult(resume_unwind_io(res), buf))
211 } else {
212 self.driver.cancel(key.erase());
213 None
214 }
215 }
216
217 pub fn cancel_token(&mut self, token: Cancel) -> bool {
225 instrument!(compio_log::Level::DEBUG, "cancel_token", ?token);
226
227 let Some(key) = token.upgrade() else {
228 return false;
229 };
230 if key.set_cancelled() || key.has_result() {
231 return false;
232 }
233 self.driver.cancel(key);
234 true
235 }
236
237 pub fn register_cancel<T: OpCode>(&mut self, key: &Key<T>) -> Cancel {
246 Cancel::new(key)
247 }
248
249 pub fn push<T: sys::OpCode + 'static>(
252 &mut self,
253 op: T,
254 ) -> PushEntry<Key<T>, BufResult<usize, T>> {
255 self.push_with_extra(op, self.default_extra())
256 }
257
258 pub fn push_with_extra<T: sys::OpCode + 'static>(
261 &mut self,
262 op: T,
263 extra: Extra,
264 ) -> PushEntry<Key<T>, BufResult<usize, T>> {
265 let key = Key::new(op, extra, self.driver_type());
266 match self.driver.push(key.clone().erase()) {
267 Poll::Pending => PushEntry::Pending(key),
268 Poll::Ready(res) => {
269 key.set_result(res);
270 PushEntry::Ready(key.take_result())
271 }
272 }
273 }
274
275 pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
279 self.driver.poll(timeout)
280 }
281
282 pub fn pop<T: OpCode>(&mut self, key: Key<T>) -> PushEntry<Key<T>, BufResult<usize, T>> {
289 instrument!(compio_log::Level::DEBUG, "pop", ?key);
290 if key.has_result() {
291 let (res, buf) = key.take_result().into_parts();
292 PushEntry::Ready(BufResult(resume_unwind_io(res), buf))
293 } else {
294 PushEntry::Pending(key)
295 }
296 }
297
298 pub fn pop_with_extra<T: OpCode>(
306 &mut self,
307 key: Key<T>,
308 ) -> PushEntry<Key<T>, (BufResult<usize, T>, Extra)> {
309 instrument!(compio_log::Level::DEBUG, "pop", ?key);
310 if key.has_result() {
311 let extra = key.swap_extra(self.default_extra());
312 let (res, buf) = key.take_result().into_parts();
313 PushEntry::Ready((BufResult(resume_unwind_io(res), buf), extra))
314 } else {
315 PushEntry::Pending(key)
316 }
317 }
318
319 pub fn pop_multishot<T: OpCode>(&mut self, key: &Key<T>) -> Option<BufResult<usize, Extra>> {
323 instrument!(compio_log::Level::DEBUG, "pop_multishot", ?key);
324 self.driver.pop_multishot(key)
325 }
326
327 pub fn update_waker<T>(&mut self, op: &Key<T>, waker: &Waker) {
329 op.set_waker(waker);
330 }
331
332 pub fn waker(&self) -> Waker {
334 self.driver.waker()
335 }
336
337 pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
344 fn unsupported(_: &[RawFd]) -> io::Error {
345 io::Error::new(
346 io::ErrorKind::Unsupported,
347 "Fixed-file registration is only supported on io-uring driver",
348 )
349 }
350
351 #[cfg(io_uring)]
352 match self.driver.as_iour() {
353 Some(iour) => iour.register_files(fds),
354 None => Err(unsupported(fds)),
355 }
356
357 #[cfg(not(io_uring))]
358 Err(unsupported(fds))
359 }
360
361 pub fn unregister_files(&self) -> io::Result<()> {
368 fn unsupported() -> io::Error {
369 io::Error::new(
370 io::ErrorKind::Unsupported,
371 "Fixed-file unregistration is only supported on io-uring driver",
372 )
373 }
374
375 #[cfg(io_uring)]
376 match self.driver.as_iour() {
377 Some(iour) => iour.unregister_files(),
378 None => Err(unsupported()),
379 }
380
381 #[cfg(not(io_uring))]
382 Err(unsupported())
383 }
384
385 pub fn register_personality(&self) -> io::Result<u16> {
397 fn unsupported() -> io::Error {
398 io::Error::new(
399 io::ErrorKind::Unsupported,
400 "Personality is only supported on io-uring driver",
401 )
402 }
403
404 #[cfg(io_uring)]
405 match self.driver.as_iour() {
406 Some(iour) => iour.register_personality(),
407 None => Err(unsupported()),
408 }
409
410 #[cfg(not(io_uring))]
411 Err(unsupported())
412 }
413
414 pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
423 fn unsupported(_: u16) -> io::Error {
424 io::Error::new(
425 io::ErrorKind::Unsupported,
426 "Personality is only supported on io-uring driver",
427 )
428 }
429
430 #[cfg(io_uring)]
431 match self.driver.as_iour() {
432 Some(iour) => iour.unregister_personality(personality),
433 None => Err(unsupported(personality)),
434 }
435
436 #[cfg(not(io_uring))]
437 Err(unsupported(personality))
438 }
439
440 pub fn buffer_pool(&mut self) -> io::Result<BufferPool> {
445 self.buffer_pool.get(&mut self.driver)
446 }
447}
448
449impl AsRawFd for Proactor {
450 fn as_raw_fd(&self) -> RawFd {
451 self.driver.as_raw_fd()
452 }
453}
454
455#[derive(Debug)]
460pub(crate) struct Entry {
461 key: ErasedKey,
462 result: io::Result<usize>,
463
464 #[cfg(io_uring)]
465 flags: u32,
466}
467
468unsafe impl Send for Entry {}
469unsafe impl Sync for Entry {}
470
471impl Entry {
472 pub(crate) fn new(key: ErasedKey, result: io::Result<usize>) -> Self {
473 #[cfg(not(io_uring))]
474 {
475 Self { key, result }
476 }
477 #[cfg(io_uring)]
478 {
479 Self {
480 key,
481 result,
482 flags: 0,
483 }
484 }
485 }
486
487 #[allow(dead_code)]
488 pub fn user_data(&self) -> usize {
489 self.key.as_raw()
490 }
491
492 #[allow(dead_code)]
493 pub fn into_key(self) -> ErasedKey {
494 self.key
495 }
496
497 #[cfg(io_uring)]
498 pub fn flags(&self) -> u32 {
499 self.flags
500 }
501
502 #[cfg(io_uring)]
503 pub(crate) fn set_flags(&mut self, flags: u32) {
504 self.flags = flags;
505 }
506
507 pub fn notify(self) {
508 #[cfg(io_uring)]
509 self.key.borrow().extra_mut().set_flags(self.flags());
510 self.key.set_result(self.result);
511 }
512}
513
514#[derive(Debug, Clone)]
515enum ThreadPoolBuilder {
516 Create { limit: usize, recv_limit: Duration },
517 Reuse(AsyncifyPool),
518}
519
520impl Default for ThreadPoolBuilder {
521 fn default() -> Self {
522 Self::new()
523 }
524}
525
526impl ThreadPoolBuilder {
527 pub fn new() -> Self {
528 Self::Create {
529 limit: 256,
530 recv_limit: Duration::from_secs(60),
531 }
532 }
533
534 pub fn create_or_reuse(&self) -> AsyncifyPool {
535 match self {
536 Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
537 Self::Reuse(pool) => pool.clone(),
538 }
539 }
540}
541
542#[derive(Debug, Clone)]
544pub struct ProactorBuilder {
545 capacity: u32,
546 pool_builder: ThreadPoolBuilder,
547 sqpoll_idle: Option<Duration>,
548 cqsize: Option<u32>,
549 coop_taskrun: bool,
550 taskrun_flag: bool,
551 eventfd: Option<RawFd>,
552 driver_type: Option<DriverType>,
553 op_flags: OpCodeFlag,
554 buffer_pool_size: u16,
555 buffer_pool_flag: u16,
556 buffer_pool_buffer_len: usize,
557 buffer_pool_allocator: BufferAlloc,
558}
559
560unsafe impl Send for ProactorBuilder {}
562unsafe impl Sync for ProactorBuilder {}
563
564impl Default for ProactorBuilder {
565 fn default() -> Self {
566 Self::new()
567 }
568}
569
570impl ProactorBuilder {
571 pub fn new() -> Self {
573 Self {
574 capacity: 1024,
575 pool_builder: ThreadPoolBuilder::new(),
576 sqpoll_idle: None,
577 cqsize: None,
578 coop_taskrun: false,
579 taskrun_flag: false,
580 eventfd: None,
581 driver_type: None,
582 op_flags: OpCodeFlag::empty(),
583 buffer_pool_size: 8,
584 buffer_pool_flag: 0,
585 buffer_pool_buffer_len: 8192,
586 buffer_pool_allocator: BufferAlloc::new::<BoxAllocator>(),
587 }
588 }
589
590 pub fn capacity(&mut self, capacity: u32) -> &mut Self {
593 self.capacity = capacity;
594 self
595 }
596
597 pub fn cqsize(&mut self, cqsize: u32) -> &mut Self {
600 self.cqsize = Some(cqsize);
601 self
602 }
603
604 pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
614 if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
615 *limit = value;
616 }
617 self
618 }
619
620 pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
625 if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
626 *recv_limit = timeout;
627 }
628 self
629 }
630
631 pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
633 self.pool_builder = ThreadPoolBuilder::Reuse(pool);
634 self
635 }
636
637 pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
640 self.reuse_thread_pool(self.create_or_get_thread_pool());
641 self
642 }
643
644 pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
646 self.pool_builder.create_or_reuse()
647 }
648
649 pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
659 self.sqpoll_idle = Some(idle);
660 self
661 }
662
663 pub fn coop_taskrun(&mut self, enable: bool) -> &mut Self {
673 self.coop_taskrun = enable;
674 self
675 }
676
677 pub fn taskrun_flag(&mut self, enable: bool) -> &mut Self {
687 self.taskrun_flag = enable;
688 self
689 }
690
691 pub fn register_eventfd(&mut self, fd: RawFd) -> &mut Self {
697 self.eventfd = Some(fd);
698 self
699 }
700
701 pub fn detect_opcode_support(&mut self, flags: OpCodeFlag) -> &mut Self {
719 self.op_flags = flags;
720 self
721 }
722
723 pub fn driver_type(&mut self, t: DriverType) -> &mut Self {
727 self.driver_type = Some(t);
728 self
729 }
730
731 pub fn buffer_pool_size(&mut self, size: NonZero<u16>) -> &mut Self {
737 self.buffer_pool_size = size.get();
738 self
739 }
740
741 pub fn buffer_pool_flag(&mut self, flag: u16) -> &mut Self {
747 self.buffer_pool_flag = flag;
748 self
749 }
750
751 pub fn buffer_pool_buffer_len(&mut self, size: usize) -> &mut Self {
755 self.buffer_pool_buffer_len = size;
756 self
757 }
758
759 pub fn buffer_pool_allocator<A: BufferAllocator>(&mut self) -> &mut Self {
776 self.buffer_pool_allocator = BufferAlloc::new::<A>();
777 self
778 }
779
780 pub fn build(&self) -> io::Result<Proactor> {
782 Proactor::with_builder(self)
783 }
784}
785
786mod seal {
787 use std::io;
788
789 use compio_buf::BufResult;
790
791 pub(crate) trait Seal {}
792
793 impl Seal for io::Error {}
794 impl<T> Seal for io::Result<T> {}
795 impl<T, B> Seal for BufResult<T, B> {}
796}
797
798#[allow(private_bounds)]
800pub trait ErrorExt: seal::Seal {
801 #[doc(hidden)]
802 fn as_io_error(&self) -> Option<&io::Error>;
803
804 fn is_cancelled(&self) -> bool {
806 #[cfg(unix)]
807 const CANCEL_ERROR: i32 = libc::ECANCELED;
808 #[cfg(windows)]
809 const CANCEL_ERROR: i32 = windows_sys::Win32::Foundation::ERROR_OPERATION_ABORTED as _;
810
811 self.as_io_error()
812 .and_then(io::Error::raw_os_error)
813 .is_some_and(|e| e == CANCEL_ERROR)
814 }
815}
816
817impl ErrorExt for io::Error {
818 fn as_io_error(&self) -> Option<&io::Error> {
819 Some(self)
820 }
821}
822
823impl<T> ErrorExt for io::Result<T> {
824 fn as_io_error(&self) -> Option<&io::Error> {
825 self.as_ref().err()
826 }
827}
828
829impl<T, B> ErrorExt for BufResult<T, B> {
830 fn as_io_error(&self) -> Option<&io::Error> {
831 self.0.as_io_error()
832 }
833}