1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
5#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
6#![warn(missing_docs)]
7
8#[cfg(all(
9 target_os = "linux",
10 not(feature = "io-uring"),
11 not(feature = "polling")
12))]
13compile_error!("You must choose at least one of these features: [\"io-uring\", \"polling\"]");
14
15use std::{
16 io,
17 task::{Poll, Waker},
18 time::Duration,
19};
20
21use compio_buf::BufResult;
22use compio_log::instrument;
23
24mod key;
25pub use key::Key;
26
27pub mod op;
28#[cfg(unix)]
29#[cfg_attr(docsrs, doc(cfg(all())))]
30mod unix;
31#[cfg(unix)]
32use unix::Overlapped;
33
34mod asyncify;
35pub use asyncify::*;
36
37mod fd;
38pub use fd::*;
39
40mod driver_type;
41pub use driver_type::*;
42
43mod buffer_pool;
44pub use buffer_pool::*;
45
46cfg_if::cfg_if! {
47 if #[cfg(windows)] {
48 #[path = "iocp/mod.rs"]
49 mod sys;
50 } else if #[cfg(fusion)] {
51 #[path = "fusion/mod.rs"]
52 mod sys;
53 } else if #[cfg(io_uring)] {
54 #[path = "iour/mod.rs"]
55 mod sys;
56 } else if #[cfg(unix)] {
57 #[path = "poll/mod.rs"]
58 mod sys;
59 }
60}
61
62pub use sys::*;
63
64#[cfg(windows)]
65#[macro_export]
66#[doc(hidden)]
67macro_rules! syscall {
68 (BOOL, $e:expr) => {
69 $crate::syscall!($e, == 0)
70 };
71 (SOCKET, $e:expr) => {
72 $crate::syscall!($e, != 0)
73 };
74 (HANDLE, $e:expr) => {
75 $crate::syscall!($e, == ::windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE)
76 };
77 ($e:expr, $op: tt $rhs: expr) => {{
78 #[allow(unused_unsafe)]
79 let res = unsafe { $e };
80 if res $op $rhs {
81 Err(::std::io::Error::last_os_error())
82 } else {
83 Ok(res)
84 }
85 }};
86}
87
88#[cfg(unix)]
90#[macro_export]
91#[doc(hidden)]
92macro_rules! syscall {
93 (break $e:expr) => {
94 loop {
95 match $crate::syscall!($e) {
96 Ok(fd) => break ::std::task::Poll::Ready(Ok(fd as usize)),
97 Err(e) if e.kind() == ::std::io::ErrorKind::WouldBlock || e.raw_os_error() == Some(::libc::EINPROGRESS)
98 => break ::std::task::Poll::Pending,
99 Err(e) if e.kind() == ::std::io::ErrorKind::Interrupted => {},
100 Err(e) => break ::std::task::Poll::Ready(Err(e)),
101 }
102 }
103 };
104 ($e:expr, $f:ident($fd:expr)) => {
105 match $crate::syscall!(break $e) {
106 ::std::task::Poll::Pending => Ok($crate::sys::Decision::$f($fd)),
107 ::std::task::Poll::Ready(Ok(res)) => Ok($crate::sys::Decision::Completed(res)),
108 ::std::task::Poll::Ready(Err(e)) => Err(e),
109 }
110 };
111 ($e:expr) => {{
112 #[allow(unused_unsafe)]
113 let res = unsafe { $e };
114 if res == -1 {
115 Err(::std::io::Error::last_os_error())
116 } else {
117 Ok(res)
118 }
119 }};
120}
121
122#[macro_export]
123#[doc(hidden)]
124macro_rules! impl_raw_fd {
125 ($t:ty, $it:ty, $inner:ident) => {
126 impl $crate::AsRawFd for $t {
127 fn as_raw_fd(&self) -> $crate::RawFd {
128 self.$inner.as_raw_fd()
129 }
130 }
131 #[cfg(unix)]
132 impl std::os::fd::AsFd for $t {
133 fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> {
134 self.$inner.as_fd()
135 }
136 }
137 #[cfg(unix)]
138 impl std::os::fd::FromRawFd for $t {
139 unsafe fn from_raw_fd(fd: $crate::RawFd) -> Self {
140 Self {
141 $inner: std::os::fd::FromRawFd::from_raw_fd(fd),
142 }
143 }
144 }
145 impl $crate::ToSharedFd<$it> for $t {
146 fn to_shared_fd(&self) -> $crate::SharedFd<$it> {
147 self.$inner.to_shared_fd()
148 }
149 }
150 };
151 ($t:ty, $it:ty, $inner:ident,file) => {
152 $crate::impl_raw_fd!($t, $it, $inner);
153 #[cfg(windows)]
154 impl std::os::windows::io::FromRawHandle for $t {
155 unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self {
156 Self {
157 $inner: std::os::windows::io::FromRawHandle::from_raw_handle(handle),
158 }
159 }
160 }
161 #[cfg(windows)]
162 impl std::os::windows::io::AsHandle for $t {
163 fn as_handle(&self) -> std::os::windows::io::BorrowedHandle {
164 self.$inner.as_handle()
165 }
166 }
167 #[cfg(windows)]
168 impl std::os::windows::io::AsRawHandle for $t {
169 fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
170 self.$inner.as_raw_handle()
171 }
172 }
173 };
174 ($t:ty, $it:ty, $inner:ident,socket) => {
175 $crate::impl_raw_fd!($t, $it, $inner);
176 #[cfg(windows)]
177 impl std::os::windows::io::FromRawSocket for $t {
178 unsafe fn from_raw_socket(sock: std::os::windows::io::RawSocket) -> Self {
179 Self {
180 $inner: std::os::windows::io::FromRawSocket::from_raw_socket(sock),
181 }
182 }
183 }
184 #[cfg(windows)]
185 impl std::os::windows::io::AsSocket for $t {
186 fn as_socket(&self) -> std::os::windows::io::BorrowedSocket {
187 self.$inner.as_socket()
188 }
189 }
190 #[cfg(windows)]
191 impl std::os::windows::io::AsRawSocket for $t {
192 fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
193 self.$inner.as_raw_socket()
194 }
195 }
196 };
197}
198
199pub enum PushEntry<K, R> {
201 Pending(K),
203 Ready(R),
205}
206
207impl<K, R> PushEntry<K, R> {
208 pub const fn is_ready(&self) -> bool {
210 matches!(self, Self::Ready(_))
211 }
212
213 pub fn take_ready(self) -> Option<R> {
215 match self {
216 Self::Pending(_) => None,
217 Self::Ready(res) => Some(res),
218 }
219 }
220
221 pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
223 match self {
224 Self::Pending(k) => PushEntry::Pending(f(k)),
225 Self::Ready(r) => PushEntry::Ready(r),
226 }
227 }
228
229 pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
231 match self {
232 Self::Pending(k) => PushEntry::Pending(k),
233 Self::Ready(r) => PushEntry::Ready(f(r)),
234 }
235 }
236}
237
238pub struct Proactor {
241 driver: Driver,
242}
243
244impl Proactor {
245 pub fn new() -> io::Result<Self> {
247 Self::builder().build()
248 }
249
250 pub fn builder() -> ProactorBuilder {
252 ProactorBuilder::new()
253 }
254
255 fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
256 Ok(Self {
257 driver: Driver::new(builder)?,
258 })
259 }
260
261 pub fn driver_type(&self) -> DriverType {
263 self.driver.driver_type()
264 }
265
266 pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
274 self.driver.attach(fd)
275 }
276
277 pub fn cancel<T: OpCode>(&mut self, mut op: Key<T>) -> Option<BufResult<usize, T>> {
283 instrument!(compio_log::Level::DEBUG, "cancel", ?op);
284 if op.set_cancelled() {
285 Some(unsafe { op.into_inner() })
287 } else {
288 self.driver
289 .cancel(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) });
290 None
291 }
292 }
293
294 pub fn push<T: OpCode + 'static>(&mut self, op: T) -> PushEntry<Key<T>, BufResult<usize, T>> {
297 let mut op = self.driver.create_op(op);
298 match self
299 .driver
300 .push(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) })
301 {
302 Poll::Pending => PushEntry::Pending(op),
303 Poll::Ready(res) => {
304 op.set_result(res);
305 PushEntry::Ready(unsafe { op.into_inner() })
307 }
308 }
309 }
310
311 pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
315 unsafe { self.driver.poll(timeout) }
316 }
317
318 pub fn pop<T>(&mut self, op: Key<T>) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
324 instrument!(compio_log::Level::DEBUG, "pop", ?op);
325 if op.has_result() {
326 let flags = op.flags();
327 PushEntry::Ready((unsafe { op.into_inner() }, flags))
329 } else {
330 PushEntry::Pending(op)
331 }
332 }
333
334 pub fn update_waker<T>(&mut self, op: &mut Key<T>, waker: Waker) {
336 op.set_waker(waker);
337 }
338
339 pub fn handle(&self) -> NotifyHandle {
341 self.driver.handle()
342 }
343
344 pub fn create_buffer_pool(
351 &mut self,
352 buffer_len: u16,
353 buffer_size: usize,
354 ) -> io::Result<BufferPool> {
355 self.driver.create_buffer_pool(buffer_len, buffer_size)
356 }
357
358 pub unsafe fn release_buffer_pool(&mut self, buffer_pool: BufferPool) -> io::Result<()> {
365 self.driver.release_buffer_pool(buffer_pool)
366 }
367}
368
369impl AsRawFd for Proactor {
370 fn as_raw_fd(&self) -> RawFd {
371 self.driver.as_raw_fd()
372 }
373}
374
375#[derive(Debug)]
377pub(crate) struct Entry {
378 user_data: usize,
379 result: io::Result<usize>,
380 flags: u32,
381}
382
383impl Entry {
384 pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
385 Self {
386 user_data,
387 result,
388 flags: 0,
389 }
390 }
391
392 #[cfg(io_uring)]
393 pub(crate) fn set_flags(&mut self, flags: u32) {
395 self.flags = flags;
396 }
397
398 pub fn user_data(&self) -> usize {
400 self.user_data
401 }
402
403 pub fn flags(&self) -> u32 {
404 self.flags
405 }
406
407 pub fn into_result(self) -> io::Result<usize> {
409 self.result
410 }
411
412 pub unsafe fn notify(self) {
414 let user_data = self.user_data();
415 let mut op = Key::<()>::new_unchecked(user_data);
416 op.set_flags(self.flags());
417 if op.set_result(self.into_result()) {
418 let _ = op.into_box();
420 }
421 }
422}
423
424#[derive(Debug, Clone)]
425enum ThreadPoolBuilder {
426 Create { limit: usize, recv_limit: Duration },
427 Reuse(AsyncifyPool),
428}
429
430impl Default for ThreadPoolBuilder {
431 fn default() -> Self {
432 Self::new()
433 }
434}
435
436impl ThreadPoolBuilder {
437 pub fn new() -> Self {
438 Self::Create {
439 limit: 256,
440 recv_limit: Duration::from_secs(60),
441 }
442 }
443
444 pub fn create_or_reuse(&self) -> AsyncifyPool {
445 match self {
446 Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
447 Self::Reuse(pool) => pool.clone(),
448 }
449 }
450}
451
452#[derive(Debug, Clone)]
454pub struct ProactorBuilder {
455 capacity: u32,
456 pool_builder: ThreadPoolBuilder,
457 sqpoll_idle: Option<Duration>,
458 coop_taskrun: bool,
459 taskrun_flag: bool,
460 eventfd: Option<RawFd>,
461 driver_type: Option<DriverType>,
462}
463
464unsafe impl Send for ProactorBuilder {}
466unsafe impl Sync for ProactorBuilder {}
467
468impl Default for ProactorBuilder {
469 fn default() -> Self {
470 Self::new()
471 }
472}
473
474impl ProactorBuilder {
475 pub fn new() -> Self {
477 Self {
478 capacity: 1024,
479 pool_builder: ThreadPoolBuilder::new(),
480 sqpoll_idle: None,
481 coop_taskrun: false,
482 taskrun_flag: false,
483 eventfd: None,
484 driver_type: None,
485 }
486 }
487
488 pub fn capacity(&mut self, capacity: u32) -> &mut Self {
491 self.capacity = capacity;
492 self
493 }
494
495 pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
505 if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
506 *limit = value;
507 }
508 self
509 }
510
511 pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
516 if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
517 *recv_limit = timeout;
518 }
519 self
520 }
521
522 pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
524 self.pool_builder = ThreadPoolBuilder::Reuse(pool);
525 self
526 }
527
528 pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
531 self.reuse_thread_pool(self.create_or_get_thread_pool());
532 self
533 }
534
535 pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
537 self.pool_builder.create_or_reuse()
538 }
539
540 pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
549 self.sqpoll_idle = Some(idle);
550 self
551 }
552
553 pub fn coop_taskrun(&mut self, enable: bool) -> &mut Self {
563 self.coop_taskrun = enable;
564 self
565 }
566
567 pub fn taskrun_flag(&mut self, enable: bool) -> &mut Self {
576 self.taskrun_flag = enable;
577 self
578 }
579
580 pub fn register_eventfd(&mut self, fd: RawFd) -> &mut Self {
586 self.eventfd = Some(fd);
587 self
588 }
589
590 pub fn driver_type(&mut self, t: DriverType) -> &mut Self {
593 self.driver_type = Some(t);
594 self
595 }
596
597 pub fn build(&self) -> io::Result<Proactor> {
599 Proactor::with_builder(self)
600 }
601}