1#![cfg_attr(docsrs, feature(doc_cfg))]
5#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
6#![warn(missing_docs)]
7
8use std::{
9 io,
10 task::{Poll, Waker},
11 time::Duration,
12};
13
14use compio_buf::BufResult;
15use compio_log::instrument;
16
17mod key;
18pub use key::Key;
19
20pub mod op;
21#[cfg(unix)]
22#[cfg_attr(docsrs, doc(cfg(all())))]
23mod unix;
24#[cfg(unix)]
25use unix::Overlapped;
26
27mod asyncify;
28pub use asyncify::*;
29
30mod fd;
31pub use fd::*;
32
33mod driver_type;
34pub use driver_type::*;
35
36mod buffer_pool;
37pub use buffer_pool::*;
38
39cfg_if::cfg_if! {
40 if #[cfg(windows)] {
41 #[path = "iocp/mod.rs"]
42 mod sys;
43 } else if #[cfg(fusion)] {
44 #[path = "fusion/mod.rs"]
45 mod sys;
46 } else if #[cfg(io_uring)] {
47 #[path = "iour/mod.rs"]
48 mod sys;
49 } else if #[cfg(all(target_os = "linux", not(feature = "polling")))] {
50 #[path = "stub/mod.rs"]
51 mod sys;
52 } else if #[cfg(unix)] {
53 #[path = "poll/mod.rs"]
54 mod sys;
55 }
56}
57
58pub use sys::*;
59
60#[cfg(windows)]
61#[macro_export]
62#[doc(hidden)]
63macro_rules! syscall {
64 (BOOL, $e:expr) => {
65 $crate::syscall!($e, == 0)
66 };
67 (SOCKET, $e:expr) => {
68 $crate::syscall!($e, != 0)
69 };
70 (HANDLE, $e:expr) => {
71 $crate::syscall!($e, == ::windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE)
72 };
73 ($e:expr, $op: tt $rhs: expr) => {{
74 #[allow(unused_unsafe)]
75 let res = unsafe { $e };
76 if res $op $rhs {
77 Err(::std::io::Error::last_os_error())
78 } else {
79 Ok(res)
80 }
81 }};
82}
83
84#[cfg(unix)]
86#[macro_export]
87#[doc(hidden)]
88macro_rules! syscall {
89 (break $e:expr) => {
90 loop {
91 match $crate::syscall!($e) {
92 Ok(fd) => break ::std::task::Poll::Ready(Ok(fd as usize)),
93 Err(e) if e.kind() == ::std::io::ErrorKind::WouldBlock || e.raw_os_error() == Some(::libc::EINPROGRESS)
94 => break ::std::task::Poll::Pending,
95 Err(e) if e.kind() == ::std::io::ErrorKind::Interrupted => {},
96 Err(e) => break ::std::task::Poll::Ready(Err(e)),
97 }
98 }
99 };
100 ($e:expr, $f:ident($fd:expr)) => {
101 match $crate::syscall!(break $e) {
102 ::std::task::Poll::Pending => Ok($crate::sys::Decision::$f($fd)),
103 ::std::task::Poll::Ready(Ok(res)) => Ok($crate::sys::Decision::Completed(res)),
104 ::std::task::Poll::Ready(Err(e)) => Err(e),
105 }
106 };
107 ($e:expr) => {{
108 #[allow(unused_unsafe)]
109 let res = unsafe { $e };
110 if res == -1 {
111 Err(::std::io::Error::last_os_error())
112 } else {
113 Ok(res)
114 }
115 }};
116}
117
118#[macro_export]
119#[doc(hidden)]
120macro_rules! impl_raw_fd {
121 ($t:ty, $it:ty, $inner:ident) => {
122 impl $crate::AsRawFd for $t {
123 fn as_raw_fd(&self) -> $crate::RawFd {
124 self.$inner.as_raw_fd()
125 }
126 }
127 #[cfg(unix)]
128 impl std::os::fd::AsFd for $t {
129 fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> {
130 self.$inner.as_fd()
131 }
132 }
133 #[cfg(unix)]
134 impl std::os::fd::FromRawFd for $t {
135 unsafe fn from_raw_fd(fd: $crate::RawFd) -> Self {
136 Self {
137 $inner: unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) },
138 }
139 }
140 }
141 impl $crate::ToSharedFd<$it> for $t {
142 fn to_shared_fd(&self) -> $crate::SharedFd<$it> {
143 self.$inner.to_shared_fd()
144 }
145 }
146 };
147 ($t:ty, $it:ty, $inner:ident,file) => {
148 $crate::impl_raw_fd!($t, $it, $inner);
149 #[cfg(windows)]
150 impl std::os::windows::io::FromRawHandle for $t {
151 unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self {
152 Self {
153 $inner: unsafe { std::os::windows::io::FromRawHandle::from_raw_handle(handle) },
154 }
155 }
156 }
157 #[cfg(windows)]
158 impl std::os::windows::io::AsHandle for $t {
159 fn as_handle(&self) -> std::os::windows::io::BorrowedHandle {
160 self.$inner.as_handle()
161 }
162 }
163 #[cfg(windows)]
164 impl std::os::windows::io::AsRawHandle for $t {
165 fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
166 self.$inner.as_raw_handle()
167 }
168 }
169 };
170 ($t:ty, $it:ty, $inner:ident,socket) => {
171 $crate::impl_raw_fd!($t, $it, $inner);
172 #[cfg(windows)]
173 impl std::os::windows::io::FromRawSocket for $t {
174 unsafe fn from_raw_socket(sock: std::os::windows::io::RawSocket) -> Self {
175 Self {
176 $inner: unsafe { std::os::windows::io::FromRawSocket::from_raw_socket(sock) },
177 }
178 }
179 }
180 #[cfg(windows)]
181 impl std::os::windows::io::AsSocket for $t {
182 fn as_socket(&self) -> std::os::windows::io::BorrowedSocket {
183 self.$inner.as_socket()
184 }
185 }
186 #[cfg(windows)]
187 impl std::os::windows::io::AsRawSocket for $t {
188 fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
189 self.$inner.as_raw_socket()
190 }
191 }
192 };
193}
194
195pub enum PushEntry<K, R> {
197 Pending(K),
199 Ready(R),
201}
202
203impl<K, R> PushEntry<K, R> {
204 pub const fn is_ready(&self) -> bool {
206 matches!(self, Self::Ready(_))
207 }
208
209 pub fn take_ready(self) -> Option<R> {
211 match self {
212 Self::Pending(_) => None,
213 Self::Ready(res) => Some(res),
214 }
215 }
216
217 pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
219 match self {
220 Self::Pending(k) => PushEntry::Pending(f(k)),
221 Self::Ready(r) => PushEntry::Ready(r),
222 }
223 }
224
225 pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
227 match self {
228 Self::Pending(k) => PushEntry::Pending(k),
229 Self::Ready(r) => PushEntry::Ready(f(r)),
230 }
231 }
232}
233
234pub struct Proactor {
237 driver: Driver,
238}
239
240impl Proactor {
241 pub fn new() -> io::Result<Self> {
243 Self::builder().build()
244 }
245
246 pub fn builder() -> ProactorBuilder {
248 ProactorBuilder::new()
249 }
250
251 fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
252 Ok(Self {
253 driver: Driver::new(builder)?,
254 })
255 }
256
257 pub fn driver_type(&self) -> DriverType {
259 self.driver.driver_type()
260 }
261
262 pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
270 self.driver.attach(fd)
271 }
272
273 pub fn cancel<T: OpCode>(&mut self, mut op: Key<T>) -> Option<BufResult<usize, T>> {
279 instrument!(compio_log::Level::DEBUG, "cancel", ?op);
280 if op.set_cancelled() {
281 Some(unsafe { op.into_inner() })
283 } else {
284 self.driver
285 .cancel(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) });
286 None
287 }
288 }
289
290 pub fn push<T: OpCode + 'static>(&mut self, op: T) -> PushEntry<Key<T>, BufResult<usize, T>> {
293 let mut op = self.driver.create_op(op);
294 match self
295 .driver
296 .push(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) })
297 {
298 Poll::Pending => PushEntry::Pending(op),
299 Poll::Ready(res) => {
300 op.set_result(res);
301 PushEntry::Ready(unsafe { op.into_inner() })
303 }
304 }
305 }
306
307 pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
311 self.driver.poll(timeout)
312 }
313
314 pub fn pop<T>(&mut self, op: Key<T>) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
320 instrument!(compio_log::Level::DEBUG, "pop", ?op);
321 if op.has_result() {
322 let flags = op.flags();
323 PushEntry::Ready((unsafe { op.into_inner() }, flags))
325 } else {
326 PushEntry::Pending(op)
327 }
328 }
329
330 pub fn update_waker<T>(&mut self, op: &mut Key<T>, waker: Waker) {
332 op.set_waker(waker);
333 }
334
335 pub fn waker(&self) -> Waker {
337 self.driver.waker()
338 }
339
340 pub fn create_buffer_pool(
347 &mut self,
348 buffer_len: u16,
349 buffer_size: usize,
350 ) -> io::Result<BufferPool> {
351 self.driver.create_buffer_pool(buffer_len, buffer_size)
352 }
353
354 pub unsafe fn release_buffer_pool(&mut self, buffer_pool: BufferPool) -> io::Result<()> {
361 unsafe { self.driver.release_buffer_pool(buffer_pool) }
362 }
363}
364
365impl AsRawFd for Proactor {
366 fn as_raw_fd(&self) -> RawFd {
367 self.driver.as_raw_fd()
368 }
369}
370
371#[derive(Debug)]
373pub(crate) struct Entry {
374 user_data: usize,
375 result: io::Result<usize>,
376 flags: u32,
377}
378
379impl Entry {
380 pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
381 Self {
382 user_data,
383 result,
384 flags: 0,
385 }
386 }
387
388 #[cfg(io_uring)]
389 pub(crate) fn set_flags(&mut self, flags: u32) {
391 self.flags = flags;
392 }
393
394 pub fn user_data(&self) -> usize {
396 self.user_data
397 }
398
399 pub fn flags(&self) -> u32 {
400 self.flags
401 }
402
403 pub fn into_result(self) -> io::Result<usize> {
405 self.result
406 }
407
408 pub unsafe fn notify(self) {
412 let user_data = self.user_data();
413 let mut op = unsafe { Key::<()>::new_unchecked(user_data) };
414 op.set_flags(self.flags());
415 if op.set_result(self.into_result()) {
416 let _ = unsafe { op.into_box() };
418 }
419 }
420}
421
422#[derive(Debug, Clone)]
423enum ThreadPoolBuilder {
424 Create { limit: usize, recv_limit: Duration },
425 Reuse(AsyncifyPool),
426}
427
428impl Default for ThreadPoolBuilder {
429 fn default() -> Self {
430 Self::new()
431 }
432}
433
434impl ThreadPoolBuilder {
435 pub fn new() -> Self {
436 Self::Create {
437 limit: 256,
438 recv_limit: Duration::from_secs(60),
439 }
440 }
441
442 pub fn create_or_reuse(&self) -> AsyncifyPool {
443 match self {
444 Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
445 Self::Reuse(pool) => pool.clone(),
446 }
447 }
448}
449
450#[derive(Debug, Clone)]
452pub struct ProactorBuilder {
453 capacity: u32,
454 pool_builder: ThreadPoolBuilder,
455 sqpoll_idle: Option<Duration>,
456 coop_taskrun: bool,
457 taskrun_flag: bool,
458 eventfd: Option<RawFd>,
459 driver_type: Option<DriverType>,
460}
461
462unsafe impl Send for ProactorBuilder {}
464unsafe impl Sync for ProactorBuilder {}
465
466impl Default for ProactorBuilder {
467 fn default() -> Self {
468 Self::new()
469 }
470}
471
472impl ProactorBuilder {
473 pub fn new() -> Self {
475 Self {
476 capacity: 1024,
477 pool_builder: ThreadPoolBuilder::new(),
478 sqpoll_idle: None,
479 coop_taskrun: false,
480 taskrun_flag: false,
481 eventfd: None,
482 driver_type: None,
483 }
484 }
485
486 pub fn capacity(&mut self, capacity: u32) -> &mut Self {
489 self.capacity = capacity;
490 self
491 }
492
493 pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
503 if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
504 *limit = value;
505 }
506 self
507 }
508
509 pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
514 if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
515 *recv_limit = timeout;
516 }
517 self
518 }
519
520 pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
522 self.pool_builder = ThreadPoolBuilder::Reuse(pool);
523 self
524 }
525
526 pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
529 self.reuse_thread_pool(self.create_or_get_thread_pool());
530 self
531 }
532
533 pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
535 self.pool_builder.create_or_reuse()
536 }
537
538 pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
547 self.sqpoll_idle = Some(idle);
548 self
549 }
550
551 pub fn coop_taskrun(&mut self, enable: bool) -> &mut Self {
561 self.coop_taskrun = enable;
562 self
563 }
564
565 pub fn taskrun_flag(&mut self, enable: bool) -> &mut Self {
574 self.taskrun_flag = enable;
575 self
576 }
577
578 pub fn register_eventfd(&mut self, fd: RawFd) -> &mut Self {
584 self.eventfd = Some(fd);
585 self
586 }
587
588 pub fn driver_type(&mut self, t: DriverType) -> &mut Self {
591 self.driver_type = Some(t);
592 self
593 }
594
595 pub fn build(&self) -> io::Result<Proactor> {
597 Proactor::with_builder(self)
598 }
599}