1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
5#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
6#![warn(missing_docs)]
7
8#[cfg(all(
9 target_os = "linux",
10 not(feature = "io-uring"),
11 not(feature = "polling")
12))]
13compile_error!("You must choose at least one of these features: [\"io-uring\", \"polling\"]");
14
15use std::{
16 io,
17 task::{Poll, Waker},
18 time::Duration,
19};
20
21use compio_buf::BufResult;
22use compio_log::instrument;
23
24mod key;
25pub use key::Key;
26
27pub mod op;
28#[cfg(unix)]
29#[cfg_attr(docsrs, doc(cfg(all())))]
30mod unix;
31#[cfg(unix)]
32use unix::Overlapped;
33
34mod asyncify;
35pub use asyncify::*;
36
37mod fd;
38pub use fd::*;
39
40mod driver_type;
41pub use driver_type::*;
42
43mod buffer_pool;
44pub use buffer_pool::*;
45
46cfg_if::cfg_if! {
47 if #[cfg(windows)] {
48 #[path = "iocp/mod.rs"]
49 mod sys;
50 } else if #[cfg(fusion)] {
51 #[path = "fusion/mod.rs"]
52 mod sys;
53 } else if #[cfg(io_uring)] {
54 #[path = "iour/mod.rs"]
55 mod sys;
56 } else if #[cfg(unix)] {
57 #[path = "poll/mod.rs"]
58 mod sys;
59 }
60}
61
62pub use sys::*;
63
64#[cfg(windows)]
65#[macro_export]
66#[doc(hidden)]
67macro_rules! syscall {
68 (BOOL, $e:expr) => {
69 $crate::syscall!($e, == 0)
70 };
71 (SOCKET, $e:expr) => {
72 $crate::syscall!($e, != 0)
73 };
74 (HANDLE, $e:expr) => {
75 $crate::syscall!($e, == ::windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE)
76 };
77 ($e:expr, $op: tt $rhs: expr) => {{
78 #[allow(unused_unsafe)]
79 let res = unsafe { $e };
80 if res $op $rhs {
81 Err(::std::io::Error::last_os_error())
82 } else {
83 Ok(res)
84 }
85 }};
86}
87
88#[cfg(unix)]
90#[macro_export]
91#[doc(hidden)]
92macro_rules! syscall {
93 (break $e:expr) => {
94 loop {
95 match $crate::syscall!($e) {
96 Ok(fd) => break ::std::task::Poll::Ready(Ok(fd as usize)),
97 Err(e) if e.kind() == ::std::io::ErrorKind::WouldBlock || e.raw_os_error() == Some(::libc::EINPROGRESS)
98 => break ::std::task::Poll::Pending,
99 Err(e) if e.kind() == ::std::io::ErrorKind::Interrupted => {},
100 Err(e) => break ::std::task::Poll::Ready(Err(e)),
101 }
102 }
103 };
104 ($e:expr, $f:ident($fd:expr)) => {
105 match $crate::syscall!(break $e) {
106 ::std::task::Poll::Pending => Ok($crate::sys::Decision::$f($fd)),
107 ::std::task::Poll::Ready(Ok(res)) => Ok($crate::sys::Decision::Completed(res)),
108 ::std::task::Poll::Ready(Err(e)) => Err(e),
109 }
110 };
111 ($e:expr) => {{
112 #[allow(unused_unsafe)]
113 let res = unsafe { $e };
114 if res == -1 {
115 Err(::std::io::Error::last_os_error())
116 } else {
117 Ok(res)
118 }
119 }};
120}
121
122#[macro_export]
123#[doc(hidden)]
124macro_rules! impl_raw_fd {
125 ($t:ty, $it:ty, $inner:ident) => {
126 impl $crate::AsRawFd for $t {
127 fn as_raw_fd(&self) -> $crate::RawFd {
128 self.$inner.as_raw_fd()
129 }
130 }
131 #[cfg(unix)]
132 impl std::os::fd::AsFd for $t {
133 fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> {
134 self.$inner.as_fd()
135 }
136 }
137 #[cfg(unix)]
138 impl std::os::fd::FromRawFd for $t {
139 unsafe fn from_raw_fd(fd: $crate::RawFd) -> Self {
140 Self {
141 $inner: std::os::fd::FromRawFd::from_raw_fd(fd),
142 }
143 }
144 }
145 impl $crate::ToSharedFd<$it> for $t {
146 fn to_shared_fd(&self) -> $crate::SharedFd<$it> {
147 self.$inner.to_shared_fd()
148 }
149 }
150 };
151 ($t:ty, $it:ty, $inner:ident,file) => {
152 $crate::impl_raw_fd!($t, $it, $inner);
153 #[cfg(windows)]
154 impl std::os::windows::io::FromRawHandle for $t {
155 unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self {
156 Self {
157 $inner: std::os::windows::io::FromRawHandle::from_raw_handle(handle),
158 }
159 }
160 }
161 #[cfg(windows)]
162 impl std::os::windows::io::AsHandle for $t {
163 fn as_handle(&self) -> std::os::windows::io::BorrowedHandle {
164 self.$inner.as_handle()
165 }
166 }
167 #[cfg(windows)]
168 impl std::os::windows::io::AsRawHandle for $t {
169 fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
170 self.$inner.as_raw_handle()
171 }
172 }
173 };
174 ($t:ty, $it:ty, $inner:ident,socket) => {
175 $crate::impl_raw_fd!($t, $it, $inner);
176 #[cfg(windows)]
177 impl std::os::windows::io::FromRawSocket for $t {
178 unsafe fn from_raw_socket(sock: std::os::windows::io::RawSocket) -> Self {
179 Self {
180 $inner: std::os::windows::io::FromRawSocket::from_raw_socket(sock),
181 }
182 }
183 }
184 #[cfg(windows)]
185 impl std::os::windows::io::AsSocket for $t {
186 fn as_socket(&self) -> std::os::windows::io::BorrowedSocket {
187 self.$inner.as_socket()
188 }
189 }
190 #[cfg(windows)]
191 impl std::os::windows::io::AsRawSocket for $t {
192 fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
193 self.$inner.as_raw_socket()
194 }
195 }
196 };
197}
198
199pub enum PushEntry<K, R> {
201 Pending(K),
203 Ready(R),
205}
206
207impl<K, R> PushEntry<K, R> {
208 pub const fn is_ready(&self) -> bool {
210 matches!(self, Self::Ready(_))
211 }
212
213 pub fn take_ready(self) -> Option<R> {
215 match self {
216 Self::Pending(_) => None,
217 Self::Ready(res) => Some(res),
218 }
219 }
220
221 pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
223 match self {
224 Self::Pending(k) => PushEntry::Pending(f(k)),
225 Self::Ready(r) => PushEntry::Ready(r),
226 }
227 }
228
229 pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
231 match self {
232 Self::Pending(k) => PushEntry::Pending(k),
233 Self::Ready(r) => PushEntry::Ready(f(r)),
234 }
235 }
236}
237
238pub struct Proactor {
241 driver: Driver,
242}
243
244impl Proactor {
245 pub fn new() -> io::Result<Self> {
247 Self::builder().build()
248 }
249
250 pub fn builder() -> ProactorBuilder {
252 ProactorBuilder::new()
253 }
254
255 fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
256 Ok(Self {
257 driver: Driver::new(builder)?,
258 })
259 }
260
261 pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
269 self.driver.attach(fd)
270 }
271
272 pub fn cancel<T: OpCode>(&mut self, mut op: Key<T>) -> Option<BufResult<usize, T>> {
278 instrument!(compio_log::Level::DEBUG, "cancel", ?op);
279 if op.set_cancelled() {
280 Some(unsafe { op.into_inner() })
282 } else {
283 self.driver
284 .cancel(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) });
285 None
286 }
287 }
288
289 pub fn push<T: OpCode + 'static>(&mut self, op: T) -> PushEntry<Key<T>, BufResult<usize, T>> {
292 let mut op = self.driver.create_op(op);
293 match self
294 .driver
295 .push(&mut unsafe { Key::<dyn OpCode>::new_unchecked(op.user_data()) })
296 {
297 Poll::Pending => PushEntry::Pending(op),
298 Poll::Ready(res) => {
299 op.set_result(res);
300 PushEntry::Ready(unsafe { op.into_inner() })
302 }
303 }
304 }
305
306 pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
310 unsafe { self.driver.poll(timeout) }
311 }
312
313 pub fn pop<T>(&mut self, op: Key<T>) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
319 instrument!(compio_log::Level::DEBUG, "pop", ?op);
320 if op.has_result() {
321 let flags = op.flags();
322 PushEntry::Ready((unsafe { op.into_inner() }, flags))
324 } else {
325 PushEntry::Pending(op)
326 }
327 }
328
329 pub fn update_waker<T>(&mut self, op: &mut Key<T>, waker: Waker) {
331 op.set_waker(waker);
332 }
333
334 pub fn handle(&self) -> NotifyHandle {
336 self.driver.handle()
337 }
338
339 pub fn create_buffer_pool(
346 &mut self,
347 buffer_len: u16,
348 buffer_size: usize,
349 ) -> io::Result<BufferPool> {
350 self.driver.create_buffer_pool(buffer_len, buffer_size)
351 }
352
353 pub unsafe fn release_buffer_pool(&mut self, buffer_pool: BufferPool) -> io::Result<()> {
360 self.driver.release_buffer_pool(buffer_pool)
361 }
362}
363
364impl AsRawFd for Proactor {
365 fn as_raw_fd(&self) -> RawFd {
366 self.driver.as_raw_fd()
367 }
368}
369
370#[derive(Debug)]
372pub(crate) struct Entry {
373 user_data: usize,
374 result: io::Result<usize>,
375 flags: u32,
376}
377
378impl Entry {
379 pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
380 Self {
381 user_data,
382 result,
383 flags: 0,
384 }
385 }
386
387 #[cfg(io_uring)]
388 pub(crate) fn set_flags(&mut self, flags: u32) {
390 self.flags = flags;
391 }
392
393 pub fn user_data(&self) -> usize {
395 self.user_data
396 }
397
398 pub fn flags(&self) -> u32 {
399 self.flags
400 }
401
402 pub fn into_result(self) -> io::Result<usize> {
404 self.result
405 }
406
407 pub unsafe fn notify(self) {
409 let user_data = self.user_data();
410 let mut op = Key::<()>::new_unchecked(user_data);
411 op.set_flags(self.flags());
412 if op.set_result(self.into_result()) {
413 let _ = op.into_box();
415 }
416 }
417}
418
419#[derive(Debug, Clone)]
420enum ThreadPoolBuilder {
421 Create { limit: usize, recv_limit: Duration },
422 Reuse(AsyncifyPool),
423}
424
425impl Default for ThreadPoolBuilder {
426 fn default() -> Self {
427 Self::new()
428 }
429}
430
431impl ThreadPoolBuilder {
432 pub fn new() -> Self {
433 Self::Create {
434 limit: 256,
435 recv_limit: Duration::from_secs(60),
436 }
437 }
438
439 pub fn create_or_reuse(&self) -> AsyncifyPool {
440 match self {
441 Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
442 Self::Reuse(pool) => pool.clone(),
443 }
444 }
445}
446
447#[derive(Debug, Clone)]
449pub struct ProactorBuilder {
450 capacity: u32,
451 pool_builder: ThreadPoolBuilder,
452 sqpoll_idle: Option<Duration>,
453 coop_taskrun: bool,
454 taskrun_flag: bool,
455 eventfd: Option<RawFd>,
456}
457
458impl Default for ProactorBuilder {
459 fn default() -> Self {
460 Self::new()
461 }
462}
463
464impl ProactorBuilder {
465 pub fn new() -> Self {
467 Self {
468 capacity: 1024,
469 pool_builder: ThreadPoolBuilder::new(),
470 sqpoll_idle: None,
471 coop_taskrun: false,
472 taskrun_flag: false,
473 eventfd: None,
474 }
475 }
476
477 pub fn capacity(&mut self, capacity: u32) -> &mut Self {
480 self.capacity = capacity;
481 self
482 }
483
484 pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
489 if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
490 *limit = value;
491 }
492 self
493 }
494
495 pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
500 if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
501 *recv_limit = timeout;
502 }
503 self
504 }
505
506 pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
508 self.pool_builder = ThreadPoolBuilder::Reuse(pool);
509 self
510 }
511
512 pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
515 self.reuse_thread_pool(self.create_or_get_thread_pool());
516 self
517 }
518
519 pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
521 self.pool_builder.create_or_reuse()
522 }
523
524 pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
533 self.sqpoll_idle = Some(idle);
534 self
535 }
536
537 pub fn coop_taskrun(&mut self, enable: bool) -> &mut Self {
547 self.coop_taskrun = enable;
548 self
549 }
550
551 pub fn taskrun_flag(&mut self, enable: bool) -> &mut Self {
560 self.taskrun_flag = enable;
561 self
562 }
563
564 pub fn register_eventfd(&mut self, fd: RawFd) -> &mut Self {
570 self.eventfd = Some(fd);
571 self
572 }
573
574 pub fn build(&self) -> io::Result<Proactor> {
576 Proactor::with_builder(self)
577 }
578}