1#![cfg_attr(docsrs, feature(doc_cfg))]
12#![cfg_attr(feature = "current_thread_id", feature(current_thread_id))]
13#![allow(unused_features)]
14#![warn(missing_docs)]
15#![deny(rustdoc::broken_intra_doc_links)]
16#![doc(
17 html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
18)]
19#![doc(
20 html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
21)]
22
23mod affinity;
24mod attacher;
25mod cancel;
26mod future;
27mod waker;
28
29pub mod fd;
30
31#[cfg(feature = "time")]
32pub mod time;
33
34use std::{
35 cell::RefCell,
36 collections::HashSet,
37 fmt::Debug,
38 future::Future,
39 io,
40 ops::Deref,
41 rc::Rc,
42 sync::Arc,
43 task::{Context, Poll, Waker},
44 time::Duration,
45};
46
47use compio_buf::{BufResult, IntoInner};
48use compio_driver::{
49 AsRawFd, Cancel, DriverType, Extra, Key, OpCode, Proactor, ProactorBuilder, PushEntry, RawFd,
50 op::Asyncify,
51};
52pub use compio_driver::{BufferPool, ErrorExt};
53use compio_executor::{Executor, ExecutorConfig};
54pub use compio_executor::{JoinHandle, ResumeUnwind};
55use compio_log::{debug, instrument};
56
57use crate::affinity::bind_to_cpu_set;
58#[cfg(feature = "time")]
59use crate::time::{TimerFuture, TimerKey, TimerRuntime};
60pub use crate::{attacher::*, cancel::CancelToken, future::*, waker::OptWaker};
61
62scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
63
64#[cold]
65fn not_in_compio_runtime() -> ! {
66 panic!("not in a compio runtime")
67}
68
69pub struct RuntimeInner {
71 executor: Executor,
72 driver: RefCell<Proactor>,
73 #[cfg(feature = "time")]
74 timer_runtime: RefCell<TimerRuntime>,
75}
76
77#[derive(Clone)]
81pub struct Runtime(Rc<RuntimeInner>);
82
83impl Debug for Runtime {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 let mut s = f.debug_struct("Runtime");
86 s.field("executor", &self.0.executor)
87 .field("driver", &"...")
88 .field("scheduler", &"...");
89 #[cfg(feature = "time")]
90 s.field("timer_runtime", &"...");
91 s.finish()
92 }
93}
94
95impl Deref for Runtime {
96 type Target = RuntimeInner;
97
98 fn deref(&self) -> &Self::Target {
99 &self.0
100 }
101}
102
103impl Runtime {
104 pub fn new() -> io::Result<Self> {
106 Self::builder().build()
107 }
108
109 pub fn builder() -> RuntimeBuilder {
111 RuntimeBuilder::new()
112 }
113
114 pub fn driver_type(&self) -> DriverType {
116 self.driver.borrow().driver_type()
117 }
118
119 pub fn try_with_current<T, F: FnOnce(&Self) -> T>(f: F) -> Result<T, F> {
122 if CURRENT_RUNTIME.is_set() {
123 Ok(CURRENT_RUNTIME.with(f))
124 } else {
125 Err(f)
126 }
127 }
128
129 pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
135 if CURRENT_RUNTIME.is_set() {
136 CURRENT_RUNTIME.with(f)
137 } else {
138 not_in_compio_runtime()
139 }
140 }
141
142 pub fn try_current() -> Option<Self> {
145 if CURRENT_RUNTIME.is_set() {
146 Some(CURRENT_RUNTIME.with(|r| r.clone()))
147 } else {
148 None
149 }
150 }
151
152 pub fn current() -> Self {
158 if CURRENT_RUNTIME.is_set() {
159 CURRENT_RUNTIME.with(|r| r.clone())
160 } else {
161 not_in_compio_runtime()
162 }
163 }
164
165 pub fn enter<T, F: FnOnce() -> T>(&self, f: F) -> T {
168 CURRENT_RUNTIME.set(self, f)
169 }
170
171 pub fn run(&self) -> bool {
177 self.executor.tick()
178 }
179
180 pub fn waker(&self) -> Waker {
184 self.driver.borrow().waker()
185 }
186
187 pub fn opt_waker(&self) -> Arc<OptWaker> {
192 OptWaker::new(self.waker())
193 }
194
195 pub fn block_on<F: Future>(&self, future: F) -> F::Output {
197 self.enter(|| {
198 let opt_waker = self.opt_waker();
199 let waker = Waker::from(opt_waker.clone());
200 let mut context = Context::from_waker(&waker);
201 let mut future = std::pin::pin!(future);
202 loop {
203 if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
204 self.run();
205 return result;
206 }
207 let remaining_tasks = self.run() | opt_waker.reset();
209 if remaining_tasks {
210 self.poll_with(Some(Duration::ZERO));
211 } else {
212 self.poll();
213 }
214 }
215 })
216 }
217
218 pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
223 self.0.executor.spawn(future)
224 }
225
226 pub fn spawn_blocking<T: Send + 'static>(
230 &self,
231 f: impl (FnOnce() -> T) + Send + 'static,
232 ) -> JoinHandle<T> {
233 let op = Asyncify::new(move || {
234 let res = f();
237 BufResult(Ok(0), res)
238 });
239 let submit = self.submit(op);
240 self.spawn(async move { submit.await.1.into_inner() })
241 }
242
243 pub fn attach(&self, fd: RawFd) -> io::Result<()> {
248 self.driver.borrow_mut().attach(fd)
249 }
250
251 fn submit_raw<T: OpCode + 'static>(
252 &self,
253 op: T,
254 extra: Option<Extra>,
255 ) -> PushEntry<Key<T>, BufResult<usize, T>> {
256 let mut this = self.driver.borrow_mut();
257 match extra {
258 Some(e) => this.push_with_extra(op, e),
259 None => this.push(op),
260 }
261 }
262
263 fn default_extra(&self) -> Extra {
264 self.driver.borrow().default_extra()
265 }
266
267 pub fn submit<T: OpCode + 'static>(&self, op: T) -> Submit<T> {
271 Submit::new(self.clone(), op)
272 }
273
274 pub fn submit_multi<T: OpCode + 'static>(&self, op: T) -> SubmitMulti<T> {
278 SubmitMulti::new(self.clone(), op)
279 }
280
281 pub(crate) fn cancel<T: OpCode>(&self, key: Key<T>) {
282 self.driver.borrow_mut().cancel(key);
283 }
284
285 pub(crate) fn register_cancel<T: OpCode>(&self, key: &Key<T>) -> Cancel {
286 self.driver.borrow_mut().register_cancel(key)
287 }
288
289 pub(crate) fn cancel_token(&self, token: Cancel) -> bool {
290 self.driver.borrow_mut().cancel_token(token)
291 }
292
293 #[cfg(feature = "time")]
294 pub(crate) fn cancel_timer(&self, key: &TimerKey) {
295 self.timer_runtime.borrow_mut().cancel(key);
296 }
297
298 pub(crate) fn poll_task<T: OpCode>(
299 &self,
300 waker: &Waker,
301 key: Key<T>,
302 ) -> PushEntry<Key<T>, BufResult<usize, T>> {
303 instrument!(compio_log::Level::DEBUG, "poll_task", ?key);
304 let mut driver = self.driver.borrow_mut();
305 driver.pop(key).map_pending(|k| {
306 driver.update_waker(&k, waker);
307 k
308 })
309 }
310
311 pub(crate) fn poll_task_with_extra<T: OpCode>(
312 &self,
313 waker: &Waker,
314 key: Key<T>,
315 ) -> PushEntry<Key<T>, (BufResult<usize, T>, Extra)> {
316 instrument!(compio_log::Level::DEBUG, "poll_task_with_extra", ?key);
317 let mut driver = self.driver.borrow_mut();
318 driver.pop_with_extra(key).map_pending(|k| {
319 driver.update_waker(&k, waker);
320 k
321 })
322 }
323
324 pub(crate) fn poll_multishot<T: OpCode>(
325 &self,
326 waker: &Waker,
327 key: &Key<T>,
328 ) -> Option<BufResult<usize, Extra>> {
329 instrument!(compio_log::Level::DEBUG, "poll_multishot", ?key);
330 let mut driver = self.driver.borrow_mut();
331 if let Some(res) = driver.pop_multishot(key) {
332 return Some(res);
333 }
334 driver.update_waker(key, waker);
335 None
336 }
337
338 #[cfg(feature = "time")]
339 pub(crate) fn poll_timer(&self, cx: &mut Context, key: &TimerKey) -> Poll<()> {
340 instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key);
341 let mut timer_runtime = self.timer_runtime.borrow_mut();
342 if timer_runtime.is_completed(key) {
343 debug!("ready");
344 Poll::Ready(())
345 } else {
346 debug!("pending");
347 timer_runtime.update_waker(key, cx.waker());
348 Poll::Pending
349 }
350 }
351
352 pub fn current_timeout(&self) -> Option<Duration> {
356 #[cfg(not(feature = "time"))]
357 let timeout = None;
358 #[cfg(feature = "time")]
359 let timeout = self.timer_runtime.borrow().min_timeout();
360 timeout
361 }
362
363 pub fn poll(&self) {
368 instrument!(compio_log::Level::DEBUG, "poll");
369 let timeout = self.current_timeout();
370 debug!("timeout: {:?}", timeout);
371 self.poll_with(timeout)
372 }
373
374 pub fn poll_with(&self, timeout: Option<Duration>) {
378 instrument!(compio_log::Level::DEBUG, "poll_with");
379
380 let mut driver = self.driver.borrow_mut();
381 match driver.poll(timeout) {
382 Ok(()) => {}
383 Err(e) => match e.kind() {
384 io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => {
385 debug!("expected error: {e}");
386 }
387 _ => panic!("{e:?}"),
388 },
389 }
390 #[cfg(feature = "time")]
391 self.timer_runtime.borrow_mut().wake();
392 }
393
394 pub fn buffer_pool(&self) -> io::Result<BufferPool> {
399 self.driver.borrow_mut().buffer_pool()
400 }
401
402 pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
409 self.driver.borrow_mut().register_files(fds)
410 }
411
412 pub fn unregister_files(&self) -> io::Result<()> {
419 self.driver.borrow_mut().unregister_files()
420 }
421
422 pub fn register_personality(&self) -> io::Result<u16> {
432 self.driver.borrow_mut().register_personality()
433 }
434
435 pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
442 self.driver.borrow_mut().unregister_personality(personality)
443 }
444}
445
446impl Drop for Runtime {
447 fn drop(&mut self) {
448 if Rc::strong_count(&self.0) > 1 {
450 return;
451 }
452
453 self.enter(|| {
454 self.executor.clear();
455 })
456 }
457}
458
459impl AsRawFd for Runtime {
460 fn as_raw_fd(&self) -> RawFd {
461 self.driver.borrow().as_raw_fd()
462 }
463}
464
465#[cfg(feature = "criterion")]
466impl criterion::async_executor::AsyncExecutor for Runtime {
467 fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
468 self.block_on(future)
469 }
470}
471
472#[cfg(feature = "criterion")]
473impl criterion::async_executor::AsyncExecutor for &Runtime {
474 fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
475 (**self).block_on(future)
476 }
477}
478
479#[derive(Debug, Clone)]
481pub struct RuntimeBuilder {
482 proactor_builder: ProactorBuilder,
483 thread_affinity: HashSet<usize>,
484 sync_queue_size: usize,
485 local_queue_size: usize,
486 event_interval: u32,
487}
488
489impl Default for RuntimeBuilder {
490 fn default() -> Self {
491 Self::new()
492 }
493}
494
495impl RuntimeBuilder {
496 pub fn new() -> Self {
498 Self {
499 proactor_builder: ProactorBuilder::new(),
500 event_interval: 61,
501 sync_queue_size: 64,
502 local_queue_size: 64,
503 thread_affinity: HashSet::new(),
504 }
505 }
506
507 pub fn with_proactor(&mut self, builder: ProactorBuilder) -> &mut Self {
509 self.proactor_builder = builder;
510 self
511 }
512
513 pub fn thread_affinity(&mut self, cpus: HashSet<usize>) -> &mut Self {
515 self.thread_affinity = cpus;
516 self
517 }
518
519 pub fn event_interval(&mut self, val: usize) -> &mut Self {
524 self.event_interval = val as _;
525 self
526 }
527
528 pub fn sync_queue_size(&mut self, val: usize) -> &mut Self {
534 self.sync_queue_size = val;
535 self
536 }
537
538 pub fn local_queue_size(&mut self, val: usize) -> &mut Self {
543 self.local_queue_size = val;
544 self
545 }
546
547 pub fn build(&self) -> io::Result<Runtime> {
549 let RuntimeBuilder {
550 proactor_builder,
551 thread_affinity,
552 sync_queue_size,
553 local_queue_size,
554 event_interval,
555 } = self;
556
557 if !thread_affinity.is_empty() {
558 bind_to_cpu_set(thread_affinity);
559 }
560 let driver = proactor_builder.build()?;
561 let executor = Executor::with_config(ExecutorConfig {
562 max_interval: *event_interval,
563 sync_queue_size: *sync_queue_size,
564 local_queue_size: *local_queue_size,
565 waker: Some(driver.waker()),
566 });
567 let inner = RuntimeInner {
568 executor,
569 driver: RefCell::new(driver),
570 #[cfg(feature = "time")]
571 timer_runtime: RefCell::new(TimerRuntime::new()),
572 };
573 Ok(Runtime(Rc::new(inner)))
574 }
575}
576
577pub fn spawn<F: Future + 'static>(future: F) -> JoinHandle<F::Output> {
603 Runtime::with_current(|r| r.spawn(future))
604}
605
606pub fn spawn_blocking<T: Send + 'static>(
615 f: impl (FnOnce() -> T) + Send + 'static,
616) -> JoinHandle<T> {
617 Runtime::with_current(|r| r.spawn_blocking(f))
618}
619
620pub fn submit<T: OpCode + 'static>(op: T) -> Submit<T> {
628 Runtime::with_current(|r| r.submit(op))
629}
630
631pub fn submit_multi<T: OpCode + 'static>(op: T) -> SubmitMulti<T> {
640 Runtime::with_current(|r| r.submit_multi(op))
641}
642
643pub fn register_files(fds: &[RawFd]) -> io::Result<()> {
656 Runtime::with_current(|r| r.register_files(fds))
657}
658
659pub fn unregister_files() -> io::Result<()> {
672 Runtime::with_current(|r| r.unregister_files())
673}
674
675#[cfg(feature = "time")]
676pub(crate) async fn create_timer(instant: std::time::Instant) {
677 let key = Runtime::with_current(|r| r.timer_runtime.borrow_mut().insert(instant));
678 if let Some(key) = key {
679 TimerFuture::new(key).await
680 }
681}