1#![cfg_attr(docsrs, feature(doc_cfg))]
6#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
7#![warn(missing_docs)]
8#![deny(rustdoc::broken_intra_doc_links)]
9#![doc(
10 html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
11)]
12#![doc(
13 html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
14)]
15
16use std::{
17 io,
18 task::{Poll, Waker},
19 time::Duration,
20};
21
22use compio_buf::BufResult;
23use compio_log::instrument;
24
25mod macros;
26
27mod key;
28pub use key::Key;
29
30mod asyncify;
31pub use asyncify::*;
32
33pub mod op;
34
35mod fd;
36pub use fd::*;
37
38mod driver_type;
39pub use driver_type::*;
40
41mod buffer_pool;
42pub use buffer_pool::*;
43
44mod sys;
45pub use sys::*;
46
47use crate::key::ErasedKey;
48
49mod sys_slice;
50
51#[derive(Debug)]
53pub enum PushEntry<K, R> {
54 Pending(K),
56 Ready(R),
58}
59
60impl<K, R> PushEntry<K, R> {
61 pub const fn is_ready(&self) -> bool {
63 matches!(self, Self::Ready(_))
64 }
65
66 pub fn take_ready(self) -> Option<R> {
68 match self {
69 Self::Pending(_) => None,
70 Self::Ready(res) => Some(res),
71 }
72 }
73
74 pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
76 match self {
77 Self::Pending(k) => PushEntry::Pending(f(k)),
78 Self::Ready(r) => PushEntry::Ready(r),
79 }
80 }
81
82 pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
84 match self {
85 Self::Pending(k) => PushEntry::Pending(k),
86 Self::Ready(r) => PushEntry::Ready(f(r)),
87 }
88 }
89}
90
91pub struct Proactor {
94 driver: Driver,
95}
96
97impl Proactor {
98 pub fn new() -> io::Result<Self> {
100 Self::builder().build()
101 }
102
103 pub fn builder() -> ProactorBuilder {
105 ProactorBuilder::new()
106 }
107
108 fn with_builder(builder: &ProactorBuilder) -> io::Result<Self> {
109 Ok(Self {
110 driver: Driver::new(builder)?,
111 })
112 }
113
114 pub fn default_extra(&self) -> Extra {
116 self.driver.default_extra().into()
117 }
118
119 pub fn driver_type(&self) -> DriverType {
121 self.driver.driver_type()
122 }
123
124 pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
132 self.driver.attach(fd)
133 }
134
135 pub fn cancel<T: OpCode>(&mut self, key: Key<T>) -> Option<BufResult<usize, T>> {
140 instrument!(compio_log::Level::DEBUG, "cancel", ?key);
141 key.set_cancelled();
142 if key.has_result() {
143 Some(key.take_result())
144 } else {
145 self.driver.cancel(key);
146 None
147 }
148 }
149
150 pub fn push<T: sys::OpCode + 'static>(
153 &mut self,
154 op: T,
155 ) -> PushEntry<Key<T>, BufResult<usize, T>> {
156 self.push_with_extra(op, self.default_extra())
157 }
158
159 pub fn push_with_extra<T: sys::OpCode + 'static>(
162 &mut self,
163 op: T,
164 extra: Extra,
165 ) -> PushEntry<Key<T>, BufResult<usize, T>> {
166 let key = Key::new(op, extra);
167 match self.driver.push(key.clone().erase()) {
168 Poll::Pending => PushEntry::Pending(key),
169 Poll::Ready(res) => {
170 key.set_result(res);
171 PushEntry::Ready(key.take_result())
172 }
173 }
174 }
175
176 pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
180 self.driver.poll(timeout)
181 }
182
183 pub fn pop<T>(&mut self, key: Key<T>) -> PushEntry<Key<T>, BufResult<usize, T>> {
189 instrument!(compio_log::Level::DEBUG, "pop", ?key);
190 if key.has_result() {
191 PushEntry::Ready(key.take_result())
192 } else {
193 PushEntry::Pending(key)
194 }
195 }
196
197 pub fn pop_with_extra<T>(
204 &mut self,
205 key: Key<T>,
206 ) -> PushEntry<Key<T>, (BufResult<usize, T>, Extra)> {
207 instrument!(compio_log::Level::DEBUG, "pop", ?key);
208 if key.has_result() {
209 let extra = key.swap_extra(self.default_extra());
210 let res = key.take_result();
211 PushEntry::Ready((res, extra))
212 } else {
213 PushEntry::Pending(key)
214 }
215 }
216
217 pub fn update_waker<T>(&mut self, op: &mut Key<T>, waker: &Waker) {
219 op.set_waker(waker);
220 }
221
222 pub fn waker(&self) -> Waker {
224 self.driver.waker()
225 }
226
227 pub fn create_buffer_pool(
234 &mut self,
235 buffer_len: u16,
236 buffer_size: usize,
237 ) -> io::Result<BufferPool> {
238 self.driver.create_buffer_pool(buffer_len, buffer_size)
239 }
240
241 pub unsafe fn release_buffer_pool(&mut self, buffer_pool: BufferPool) -> io::Result<()> {
248 unsafe { self.driver.release_buffer_pool(buffer_pool) }
249 }
250
251 pub fn register_personality(&self) -> io::Result<u16> {
263 fn unsupported() -> io::Error {
264 io::Error::new(
265 io::ErrorKind::Unsupported,
266 "Personality is only supported on io-uring driver",
267 )
268 }
269
270 #[cfg(io_uring)]
271 match self.driver.as_iour() {
272 Some(iour) => iour.register_personality(),
273 None => Err(unsupported()),
274 }
275
276 #[cfg(not(io_uring))]
277 Err(unsupported())
278 }
279
280 pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
289 fn unsupported(_: u16) -> io::Error {
290 io::Error::new(
291 io::ErrorKind::Unsupported,
292 "Personality is only supported on io-uring driver",
293 )
294 }
295
296 #[cfg(io_uring)]
297 match self.driver.as_iour() {
298 Some(iour) => iour.unregister_personality(personality),
299 None => Err(unsupported(personality)),
300 }
301
302 #[cfg(not(io_uring))]
303 Err(unsupported(personality))
304 }
305}
306
307impl AsRawFd for Proactor {
308 fn as_raw_fd(&self) -> RawFd {
309 self.driver.as_raw_fd()
310 }
311}
312
313#[derive(Debug)]
318pub(crate) struct Entry {
319 key: ErasedKey,
320 result: io::Result<usize>,
321
322 #[cfg(io_uring)]
323 flags: u32,
324}
325
326unsafe impl Send for Entry {}
327unsafe impl Sync for Entry {}
328
329impl Entry {
330 pub(crate) fn new(key: ErasedKey, result: io::Result<usize>) -> Self {
331 #[cfg(not(io_uring))]
332 {
333 Self { key, result }
334 }
335 #[cfg(io_uring)]
336 {
337 Self {
338 key,
339 result,
340 flags: 0,
341 }
342 }
343 }
344
345 #[allow(dead_code)]
346 pub fn user_data(&self) -> usize {
347 self.key.as_raw()
348 }
349
350 #[allow(dead_code)]
351 pub fn into_key(self) -> ErasedKey {
352 self.key
353 }
354
355 #[cfg(io_uring)]
356 pub fn flags(&self) -> u32 {
357 self.flags
358 }
359
360 #[cfg(io_uring)]
361 pub(crate) fn set_flags(&mut self, flags: u32) {
363 self.flags = flags;
364 }
365
366 pub fn notify(self) {
367 #[cfg(io_uring)]
368 self.key.borrow().extra_mut().set_flags(self.flags());
369 self.key.set_result(self.result);
370 }
371}
372
373#[derive(Debug, Clone)]
374enum ThreadPoolBuilder {
375 Create { limit: usize, recv_limit: Duration },
376 Reuse(AsyncifyPool),
377}
378
379impl Default for ThreadPoolBuilder {
380 fn default() -> Self {
381 Self::new()
382 }
383}
384
385impl ThreadPoolBuilder {
386 pub fn new() -> Self {
387 Self::Create {
388 limit: 256,
389 recv_limit: Duration::from_secs(60),
390 }
391 }
392
393 pub fn create_or_reuse(&self) -> AsyncifyPool {
394 match self {
395 Self::Create { limit, recv_limit } => AsyncifyPool::new(*limit, *recv_limit),
396 Self::Reuse(pool) => pool.clone(),
397 }
398 }
399}
400
401#[derive(Debug, Clone)]
403pub struct ProactorBuilder {
404 capacity: u32,
405 pool_builder: ThreadPoolBuilder,
406 sqpoll_idle: Option<Duration>,
407 coop_taskrun: bool,
408 taskrun_flag: bool,
409 eventfd: Option<RawFd>,
410 driver_type: Option<DriverType>,
411}
412
413unsafe impl Send for ProactorBuilder {}
415unsafe impl Sync for ProactorBuilder {}
416
417impl Default for ProactorBuilder {
418 fn default() -> Self {
419 Self::new()
420 }
421}
422
423impl ProactorBuilder {
424 pub fn new() -> Self {
426 Self {
427 capacity: 1024,
428 pool_builder: ThreadPoolBuilder::new(),
429 sqpoll_idle: None,
430 coop_taskrun: false,
431 taskrun_flag: false,
432 eventfd: None,
433 driver_type: None,
434 }
435 }
436
437 pub fn capacity(&mut self, capacity: u32) -> &mut Self {
440 self.capacity = capacity;
441 self
442 }
443
444 pub fn thread_pool_limit(&mut self, value: usize) -> &mut Self {
454 if let ThreadPoolBuilder::Create { limit, .. } = &mut self.pool_builder {
455 *limit = value;
456 }
457 self
458 }
459
460 pub fn thread_pool_recv_timeout(&mut self, timeout: Duration) -> &mut Self {
465 if let ThreadPoolBuilder::Create { recv_limit, .. } = &mut self.pool_builder {
466 *recv_limit = timeout;
467 }
468 self
469 }
470
471 pub fn reuse_thread_pool(&mut self, pool: AsyncifyPool) -> &mut Self {
473 self.pool_builder = ThreadPoolBuilder::Reuse(pool);
474 self
475 }
476
477 pub fn force_reuse_thread_pool(&mut self) -> &mut Self {
480 self.reuse_thread_pool(self.create_or_get_thread_pool());
481 self
482 }
483
484 pub fn create_or_get_thread_pool(&self) -> AsyncifyPool {
486 self.pool_builder.create_or_reuse()
487 }
488
489 pub fn sqpoll_idle(&mut self, idle: Duration) -> &mut Self {
498 self.sqpoll_idle = Some(idle);
499 self
500 }
501
502 pub fn coop_taskrun(&mut self, enable: bool) -> &mut Self {
512 self.coop_taskrun = enable;
513 self
514 }
515
516 pub fn taskrun_flag(&mut self, enable: bool) -> &mut Self {
525 self.taskrun_flag = enable;
526 self
527 }
528
529 pub fn register_eventfd(&mut self, fd: RawFd) -> &mut Self {
535 self.eventfd = Some(fd);
536 self
537 }
538
539 pub fn driver_type(&mut self, t: DriverType) -> &mut Self {
542 self.driver_type = Some(t);
543 self
544 }
545
546 pub fn build(&self) -> io::Result<Proactor> {
548 Proactor::with_builder(self)
549 }
550}