1#![cfg_attr(docsrs, feature(doc_cfg))]
12#![cfg_attr(feature = "current_thread_id", feature(current_thread_id))]
13#![cfg_attr(feature = "read_buf", feature(read_buf, core_io_borrowed_buf))]
14#![allow(unused_features)]
15#![warn(missing_docs)]
16#![deny(rustdoc::broken_intra_doc_links)]
17#![doc(
18 html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
19)]
20#![doc(
21 html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
22)]
23
24mod affinity;
25mod attacher;
26mod cancel;
27mod future;
28mod waker;
29
30pub mod fd;
31
32#[cfg(feature = "time")]
33pub mod time;
34
35use std::{
36 cell::RefCell,
37 collections::HashSet,
38 fmt::Debug,
39 future::Future,
40 io,
41 rc::Rc,
42 task::{Context, Poll, Waker},
43 time::Duration,
44};
45
46use compio_buf::{BufResult, IntoInner};
47use compio_driver::{AsRawFd, DriverType, OpCode, Proactor, ProactorBuilder, RawFd, op::Asyncify};
48pub use compio_driver::{BufferPool, ErrorExt};
49use compio_executor::{Executor, ExecutorConfig};
50pub use compio_executor::{JoinHandle, ResumeUnwind};
51use compio_log::{debug, instrument};
52
53use crate::affinity::bind_to_cpu_set;
54#[cfg(feature = "time")]
55use crate::time::TimerRuntime;
56pub use crate::{attacher::*, cancel::CancelToken, future::*};
57
58scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
59
60#[cold]
61fn not_in_compio_runtime() -> ! {
62 panic!("not in a compio runtime")
63}
64
65#[derive(Clone)]
69pub struct Runtime {
70 executor: Rc<Executor>,
71 driver: Rc<RefCell<Proactor>>,
72 #[cfg(feature = "time")]
73 timer_runtime: Rc<RefCell<TimerRuntime>>,
74}
75
76impl Debug for Runtime {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 let mut s = f.debug_struct("Runtime");
79 s.field("executor", &self.executor);
80 s.field("driver", &"...");
81 #[cfg(feature = "time")]
82 s.field("timer_runtime", &"...");
83 s.finish()
84 }
85}
86
87impl Runtime {
88 pub fn new() -> io::Result<Self> {
90 Self::builder().build()
91 }
92
93 pub fn builder() -> RuntimeBuilder {
95 RuntimeBuilder::new()
96 }
97
98 pub fn driver_type(&self) -> DriverType {
100 self.driver.borrow().driver_type()
101 }
102
103 pub fn try_with_current<T, F: FnOnce(&Self) -> T>(f: F) -> Result<T, F> {
106 if CURRENT_RUNTIME.is_set() {
107 Ok(CURRENT_RUNTIME.with(f))
108 } else {
109 Err(f)
110 }
111 }
112
113 pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
119 if CURRENT_RUNTIME.is_set() {
120 CURRENT_RUNTIME.with(f)
121 } else {
122 not_in_compio_runtime()
123 }
124 }
125
126 pub fn try_current() -> Option<Self> {
129 if CURRENT_RUNTIME.is_set() {
130 Some(CURRENT_RUNTIME.with(|r| r.clone()))
131 } else {
132 None
133 }
134 }
135
136 pub fn current() -> Self {
142 if CURRENT_RUNTIME.is_set() {
143 CURRENT_RUNTIME.with(|r| r.clone())
144 } else {
145 not_in_compio_runtime()
146 }
147 }
148
149 pub fn enter<T, F: FnOnce() -> T>(&self, f: F) -> T {
152 CURRENT_RUNTIME.set(self, f)
153 }
154
155 pub fn run(&self) -> bool {
161 self.executor.tick()
162 }
163
164 pub fn waker(&self) -> Waker {
168 self.driver.borrow().waker()
169 }
170
171 pub fn block_on<F: Future>(&self, future: F) -> F::Output {
173 self.enter(|| {
174 let waker = self.waker();
175 let mut context = Context::from_waker(&waker);
176 let mut future = std::pin::pin!(future);
177 loop {
178 if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
179 self.run();
180 return result;
181 }
182 let remaining_tasks = self.run();
183 if remaining_tasks {
184 self.poll_with(Some(Duration::ZERO));
185 } else {
186 self.poll();
187 }
188 }
189 })
190 }
191
192 pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
197 self.executor.spawn(future)
198 }
199
200 pub fn spawn_blocking<T: Send + 'static>(
204 &self,
205 f: impl (FnOnce() -> T) + Send + 'static,
206 ) -> JoinHandle<T> {
207 use futures_util::FutureExt;
208
209 let op = Asyncify::new(move || {
210 let res = f();
213 BufResult(Ok(0), res)
214 });
215 let submit = self.submit(op);
216 self.spawn(submit.map(|res| res.1.into_inner()))
217 }
218
219 pub fn attach(&self, fd: RawFd) -> io::Result<()> {
224 self.driver.borrow_mut().attach(fd)
225 }
226
227 pub fn submit<T: OpCode + 'static>(&self, op: T) -> Submit<T> {
231 Submit::new(self.driver.clone(), op)
232 }
233
234 pub fn submit_multi<T: OpCode + 'static>(&self, op: T) -> SubmitMulti<T> {
238 SubmitMulti::new(self.driver.clone(), op)
239 }
240
241 pub fn flush(&self) -> bool {
245 self.driver.borrow_mut().flush()
246 }
247
248 pub fn current_timeout(&self) -> Option<Duration> {
252 #[cfg(not(feature = "time"))]
253 let timeout = None;
254 #[cfg(feature = "time")]
255 let timeout = self.timer_runtime.borrow().min_timeout();
256 timeout
257 }
258
259 pub fn poll(&self) {
264 instrument!(compio_log::Level::DEBUG, "poll");
265 let timeout = self.current_timeout();
266 debug!("timeout: {:?}", timeout);
267 self.poll_with(timeout)
268 }
269
270 pub fn poll_with(&self, timeout: Option<Duration>) {
274 instrument!(compio_log::Level::DEBUG, "poll_with");
275
276 let mut driver = self.driver.borrow_mut();
277 match driver.poll(timeout) {
278 Ok(()) => {}
279 Err(e) => match e.kind() {
280 io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => {
281 debug!("expected error: {e}");
282 }
283 _ => panic!("{e:?}"),
284 },
285 }
286 #[cfg(feature = "time")]
287 self.timer_runtime.borrow_mut().wake();
288 }
289
290 pub fn buffer_pool(&self) -> io::Result<BufferPool> {
295 self.driver.borrow_mut().buffer_pool()
296 }
297
298 pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
305 self.driver.borrow_mut().register_files(fds)
306 }
307
308 pub fn unregister_files(&self) -> io::Result<()> {
315 self.driver.borrow_mut().unregister_files()
316 }
317
318 pub fn register_personality(&self) -> io::Result<u16> {
328 self.driver.borrow_mut().register_personality()
329 }
330
331 pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
338 self.driver.borrow_mut().unregister_personality(personality)
339 }
340}
341
342impl Drop for Runtime {
343 fn drop(&mut self) {
344 if Rc::strong_count(&self.executor) > 1 {
346 return;
347 }
348
349 self.enter(|| {
350 self.executor.clear();
351 })
352 }
353}
354
355impl AsRawFd for Runtime {
356 fn as_raw_fd(&self) -> RawFd {
357 self.driver.borrow().as_raw_fd()
358 }
359}
360
361#[cfg(feature = "criterion")]
362impl criterion::async_executor::AsyncExecutor for Runtime {
363 fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
364 self.block_on(future)
365 }
366}
367
368#[cfg(feature = "criterion")]
369impl criterion::async_executor::AsyncExecutor for &Runtime {
370 fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
371 (**self).block_on(future)
372 }
373}
374
375#[derive(Debug, Clone)]
377pub struct RuntimeBuilder {
378 proactor_builder: ProactorBuilder,
379 thread_affinity: HashSet<usize>,
380 sync_queue_size: usize,
381 local_queue_size: usize,
382 event_interval: u32,
383}
384
385impl Default for RuntimeBuilder {
386 fn default() -> Self {
387 Self::new()
388 }
389}
390
391impl RuntimeBuilder {
392 pub fn new() -> Self {
394 Self {
395 proactor_builder: ProactorBuilder::new(),
396 event_interval: 61,
397 sync_queue_size: 64,
398 local_queue_size: 64,
399 thread_affinity: HashSet::new(),
400 }
401 }
402
403 pub fn with_proactor(&mut self, builder: ProactorBuilder) -> &mut Self {
405 self.proactor_builder = builder;
406 self
407 }
408
409 pub fn thread_affinity(&mut self, cpus: HashSet<usize>) -> &mut Self {
411 self.thread_affinity = cpus;
412 self
413 }
414
415 pub fn event_interval(&mut self, val: usize) -> &mut Self {
420 self.event_interval = val as _;
421 self
422 }
423
424 pub fn sync_queue_size(&mut self, val: usize) -> &mut Self {
430 self.sync_queue_size = val;
431 self
432 }
433
434 pub fn local_queue_size(&mut self, val: usize) -> &mut Self {
439 self.local_queue_size = val;
440 self
441 }
442
443 pub fn build(&self) -> io::Result<Runtime> {
445 let RuntimeBuilder {
446 proactor_builder,
447 thread_affinity,
448 sync_queue_size,
449 local_queue_size,
450 event_interval,
451 } = self;
452
453 if !thread_affinity.is_empty() {
454 bind_to_cpu_set(thread_affinity);
455 }
456 let driver = proactor_builder.build()?;
457 let executor = Executor::with_config(ExecutorConfig {
458 max_interval: *event_interval,
459 sync_queue_size: *sync_queue_size,
460 local_queue_size: *local_queue_size,
461 waker: Some(driver.waker()),
462 });
463 Ok(Runtime {
464 executor: Rc::new(executor),
465 driver: Rc::new(RefCell::new(driver)),
466 #[cfg(feature = "time")]
467 timer_runtime: Rc::new(RefCell::new(TimerRuntime::new())),
468 })
469 }
470}
471
472pub fn spawn<F: Future + 'static>(future: F) -> JoinHandle<F::Output> {
498 Runtime::with_current(|r| r.spawn(future))
499}
500
501pub fn spawn_blocking<T: Send + 'static>(
510 f: impl (FnOnce() -> T) + Send + 'static,
511) -> JoinHandle<T> {
512 Runtime::with_current(|r| r.spawn_blocking(f))
513}
514
515pub fn submit<T: OpCode + 'static>(op: T) -> Submit<T> {
523 Runtime::with_current(|r| r.submit(op))
524}
525
526pub fn submit_multi<T: OpCode + 'static>(op: T) -> SubmitMulti<T> {
535 Runtime::with_current(|r| r.submit_multi(op))
536}
537
538pub fn register_files(fds: &[RawFd]) -> io::Result<()> {
551 Runtime::with_current(|r| r.register_files(fds))
552}
553
554pub fn unregister_files() -> io::Result<()> {
567 Runtime::with_current(|r| r.unregister_files())
568}