compio_runtime/lib.rs
1//! The compio runtime.
2//!
3//! ```
4//! let ans = compio_runtime::Runtime::new().unwrap().block_on(async {
5//! println!("Hello world!");
6//! 42
7//! });
8//! assert_eq!(ans, 42);
9//! ```
10
11#![cfg_attr(docsrs, feature(doc_cfg))]
12#![cfg_attr(feature = "current_thread_id", feature(current_thread_id))]
13#![allow(unused_features)]
14#![warn(missing_docs)]
15#![deny(rustdoc::broken_intra_doc_links)]
16#![doc(
17 html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
18)]
19#![doc(
20 html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
21)]
22
23mod affinity;
24mod attacher;
25mod cancel;
26mod future;
27mod waker;
28
29pub mod fd;
30
31#[cfg(feature = "time")]
32pub mod time;
33
34use std::{
35 cell::RefCell,
36 collections::HashSet,
37 fmt::Debug,
38 future::Future,
39 io,
40 rc::Rc,
41 task::{Context, Poll, Waker},
42 time::Duration,
43};
44
45use compio_buf::{BufResult, IntoInner};
46use compio_driver::{AsRawFd, DriverType, OpCode, Proactor, ProactorBuilder, RawFd, op::Asyncify};
47pub use compio_driver::{BufferPool, ErrorExt};
48use compio_executor::{Executor, ExecutorConfig};
49pub use compio_executor::{JoinHandle, ResumeUnwind};
50use compio_log::{debug, instrument};
51
52use crate::affinity::bind_to_cpu_set;
53#[cfg(feature = "time")]
54use crate::time::TimerRuntime;
55pub use crate::{attacher::*, cancel::CancelToken, future::*};
56
57scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
58
59#[cold]
60fn not_in_compio_runtime() -> ! {
61 panic!("not in a compio runtime")
62}
63
64/// The async runtime of compio.
65///
66/// It is a thread-local runtime, meaning it cannot be sent to other threads.
67#[derive(Clone)]
68pub struct Runtime {
69 executor: Rc<Executor>,
70 driver: Rc<RefCell<Proactor>>,
71 #[cfg(feature = "time")]
72 timer_runtime: Rc<RefCell<TimerRuntime>>,
73}
74
75impl Debug for Runtime {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 let mut s = f.debug_struct("Runtime");
78 s.field("executor", &self.executor);
79 s.field("driver", &"...");
80 #[cfg(feature = "time")]
81 s.field("timer_runtime", &"...");
82 s.finish()
83 }
84}
85
86impl Runtime {
87 /// Create [`Runtime`] with default config.
88 pub fn new() -> io::Result<Self> {
89 Self::builder().build()
90 }
91
92 /// Create a builder for [`Runtime`].
93 pub fn builder() -> RuntimeBuilder {
94 RuntimeBuilder::new()
95 }
96
97 /// The current driver type.
98 pub fn driver_type(&self) -> DriverType {
99 self.driver.borrow().driver_type()
100 }
101
102 /// Try to perform a function on the current runtime, and if no runtime is
103 /// running, return the function back.
104 pub fn try_with_current<T, F: FnOnce(&Self) -> T>(f: F) -> Result<T, F> {
105 if CURRENT_RUNTIME.is_set() {
106 Ok(CURRENT_RUNTIME.with(f))
107 } else {
108 Err(f)
109 }
110 }
111
112 /// Perform a function on the current runtime.
113 ///
114 /// ## Panics
115 ///
116 /// This method will panic if there is no running [`Runtime`].
117 pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
118 if CURRENT_RUNTIME.is_set() {
119 CURRENT_RUNTIME.with(f)
120 } else {
121 not_in_compio_runtime()
122 }
123 }
124
125 /// Try to get the current runtime, and if no runtime is running, return
126 /// `None`.
127 pub fn try_current() -> Option<Self> {
128 if CURRENT_RUNTIME.is_set() {
129 Some(CURRENT_RUNTIME.with(|r| r.clone()))
130 } else {
131 None
132 }
133 }
134
135 /// Get the current runtime.
136 ///
137 /// # Panics
138 ///
139 /// This method will panic if there is no running [`Runtime`].
140 pub fn current() -> Self {
141 if CURRENT_RUNTIME.is_set() {
142 CURRENT_RUNTIME.with(|r| r.clone())
143 } else {
144 not_in_compio_runtime()
145 }
146 }
147
148 /// Set this runtime as current runtime, and perform a function in the
149 /// current scope.
150 pub fn enter<T, F: FnOnce() -> T>(&self, f: F) -> T {
151 CURRENT_RUNTIME.set(self, f)
152 }
153
154 /// Low level API to control the runtime.
155 ///
156 /// Run the scheduled tasks.
157 ///
158 /// The return value indicates whether there are still tasks in the queue.
159 pub fn run(&self) -> bool {
160 self.executor.tick()
161 }
162
163 /// Low level API to control the runtime.
164 ///
165 /// Create a waker that always notifies the runtime when woken.
166 pub fn waker(&self) -> Waker {
167 self.driver.borrow().waker()
168 }
169
170 /// Block on the future till it completes.
171 pub fn block_on<F: Future>(&self, future: F) -> F::Output {
172 self.enter(|| {
173 let waker = self.waker();
174 let mut context = Context::from_waker(&waker);
175 let mut future = std::pin::pin!(future);
176 loop {
177 if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
178 self.run();
179 return result;
180 }
181 let remaining_tasks = self.run();
182 if remaining_tasks {
183 self.poll_with(Some(Duration::ZERO));
184 } else {
185 self.poll();
186 }
187 }
188 })
189 }
190
191 /// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
192 ///
193 /// Spawning a task enables the task to execute concurrently to other tasks.
194 /// There is no guarantee that a spawned task will execute to completion.
195 pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
196 self.executor.spawn(future)
197 }
198
199 /// Spawns a blocking task in a new thread, and wait for it.
200 ///
201 /// The task will not be cancelled even if the future is dropped.
202 pub fn spawn_blocking<T: Send + 'static>(
203 &self,
204 f: impl (FnOnce() -> T) + Send + 'static,
205 ) -> JoinHandle<T> {
206 use futures_util::FutureExt;
207
208 let op = Asyncify::new(move || {
209 // TODO: Refactor blocking pool and handle panic within worker and propagate it
210 // back
211 let res = f();
212 BufResult(Ok(0), res)
213 });
214 let submit = self.submit(op);
215 self.spawn(submit.map(|res| res.1.into_inner()))
216 }
217
218 /// Attach a raw file descriptor/handle/socket to the runtime.
219 ///
220 /// You only need this when authoring your own high-level APIs. High-level
221 /// resources in this crate are attached automatically.
222 pub fn attach(&self, fd: RawFd) -> io::Result<()> {
223 self.driver.borrow_mut().attach(fd)
224 }
225
226 /// Submit an operation to the runtime.
227 ///
228 /// You only need this when authoring your own [`OpCode`].
229 pub fn submit<T: OpCode + 'static>(&self, op: T) -> Submit<T> {
230 Submit::new(self.driver.clone(), op)
231 }
232
233 /// Submit a multishot operation to the runtime.
234 ///
235 /// You only need this when authoring your own [`OpCode`].
236 pub fn submit_multi<T: OpCode + 'static>(&self, op: T) -> SubmitMulti<T> {
237 SubmitMulti::new(self.driver.clone(), op)
238 }
239
240 /// Flush the driver and return whether the driver has been notified.
241 ///
242 /// See [`Proactor::flush`] for more details.
243 pub fn flush(&self) -> bool {
244 self.driver.borrow_mut().flush()
245 }
246
247 /// Low level API to control the runtime.
248 ///
249 /// Get the timeout value to be passed to [`Proactor::poll`].
250 pub fn current_timeout(&self) -> Option<Duration> {
251 #[cfg(not(feature = "time"))]
252 let timeout = None;
253 #[cfg(feature = "time")]
254 let timeout = self.timer_runtime.borrow().min_timeout();
255 timeout
256 }
257
258 /// Low level API to control the runtime.
259 ///
260 /// Poll the inner proactor. It is equal to calling [`Runtime::poll_with`]
261 /// with [`Runtime::current_timeout`].
262 pub fn poll(&self) {
263 instrument!(compio_log::Level::DEBUG, "poll");
264 let timeout = self.current_timeout();
265 debug!("timeout: {:?}", timeout);
266 self.poll_with(timeout)
267 }
268
269 /// Low level API to control the runtime.
270 ///
271 /// Poll the inner proactor with a custom timeout.
272 pub fn poll_with(&self, timeout: Option<Duration>) {
273 instrument!(compio_log::Level::DEBUG, "poll_with");
274
275 let mut driver = self.driver.borrow_mut();
276 match driver.poll(timeout) {
277 Ok(()) => {}
278 Err(e) => match e.kind() {
279 io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => {
280 debug!("expected error: {e}");
281 }
282 _ => panic!("{e:?}"),
283 },
284 }
285 #[cfg(feature = "time")]
286 self.timer_runtime.borrow_mut().wake();
287 }
288
289 /// Get buffer pool of the runtime.
290 ///
291 /// This will lazily initialize the pool at the first time it's accessed,
292 /// and future access to the pool will be cheap and infallible.
293 pub fn buffer_pool(&self) -> io::Result<BufferPool> {
294 self.driver.borrow_mut().buffer_pool()
295 }
296
297 /// Register file descriptors for fixed-file operations.
298 ///
299 /// This is only supported on io-uring driver, and will return an
300 /// [`Unsupported`] io error on all other drivers.
301 ///
302 /// [`Unsupported`]: std::io::ErrorKind::Unsupported
303 pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
304 self.driver.borrow_mut().register_files(fds)
305 }
306
307 /// Unregister previously registered file descriptors.
308 ///
309 /// This is only supported on io-uring driver, and will return an
310 /// [`Unsupported`] io error on all other drivers.
311 ///
312 /// [`Unsupported`]: std::io::ErrorKind::Unsupported
313 pub fn unregister_files(&self) -> io::Result<()> {
314 self.driver.borrow_mut().unregister_files()
315 }
316
317 /// Register the personality for the runtime.
318 ///
319 /// This is only supported on io-uring driver, and will return an
320 /// [`Unsupported`] io error on all other drivers.
321 ///
322 /// The returned personality can be used with
323 /// [`FutureExt::with_personality`].
324 ///
325 /// [`Unsupported`]: std::io::ErrorKind::Unsupported
326 pub fn register_personality(&self) -> io::Result<u16> {
327 self.driver.borrow_mut().register_personality()
328 }
329
330 /// Unregister the given personality for the runtime.
331 ///
332 /// This is only supported on io-uring driver, and will return an
333 /// [`Unsupported`] io error on all other drivers.
334 ///
335 /// [`Unsupported`]: std::io::ErrorKind::Unsupported
336 pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
337 self.driver.borrow_mut().unregister_personality(personality)
338 }
339}
340
341impl Drop for Runtime {
342 fn drop(&mut self) {
343 // this is not the last runtime reference, no need to clear
344 if Rc::strong_count(&self.executor) > 1 {
345 return;
346 }
347
348 self.enter(|| {
349 self.executor.clear();
350 })
351 }
352}
353
354impl AsRawFd for Runtime {
355 fn as_raw_fd(&self) -> RawFd {
356 self.driver.borrow().as_raw_fd()
357 }
358}
359
360#[cfg(feature = "criterion")]
361impl criterion::async_executor::AsyncExecutor for Runtime {
362 fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
363 self.block_on(future)
364 }
365}
366
367#[cfg(feature = "criterion")]
368impl criterion::async_executor::AsyncExecutor for &Runtime {
369 fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
370 (**self).block_on(future)
371 }
372}
373
374/// Builder for [`Runtime`].
375#[derive(Debug, Clone)]
376pub struct RuntimeBuilder {
377 proactor_builder: ProactorBuilder,
378 thread_affinity: HashSet<usize>,
379 sync_queue_size: usize,
380 local_queue_size: usize,
381 event_interval: u32,
382}
383
384impl Default for RuntimeBuilder {
385 fn default() -> Self {
386 Self::new()
387 }
388}
389
390impl RuntimeBuilder {
391 /// Create the builder with default config.
392 pub fn new() -> Self {
393 Self {
394 proactor_builder: ProactorBuilder::new(),
395 event_interval: 61,
396 sync_queue_size: 64,
397 local_queue_size: 64,
398 thread_affinity: HashSet::new(),
399 }
400 }
401
402 /// Replace proactor builder.
403 pub fn with_proactor(&mut self, builder: ProactorBuilder) -> &mut Self {
404 self.proactor_builder = builder;
405 self
406 }
407
408 /// Sets the thread affinity for the runtime.
409 pub fn thread_affinity(&mut self, cpus: HashSet<usize>) -> &mut Self {
410 self.thread_affinity = cpus;
411 self
412 }
413
414 /// Sets the number of scheduler ticks after which the scheduler will poll
415 /// for external events (timers, I/O, and so on).
416 ///
417 /// A scheduler “tick” roughly corresponds to one poll invocation on a task.
418 pub fn event_interval(&mut self, val: usize) -> &mut Self {
419 self.event_interval = val as _;
420 self
421 }
422
423 /// The size of the sync queue, which is used to wake up tasks from other
424 /// threads (remote).
425 ///
426 /// This is fixed and will create backpressure in other remote threads when
427 /// full.
428 pub fn sync_queue_size(&mut self, val: usize) -> &mut Self {
429 self.sync_queue_size = val;
430 self
431 }
432
433 /// The size of the local queues, which is used to wake up tasks within the
434 /// same thread.
435 ///
436 /// This is dynamically resized to avoid blocking.
437 pub fn local_queue_size(&mut self, val: usize) -> &mut Self {
438 self.local_queue_size = val;
439 self
440 }
441
442 /// Build [`Runtime`].
443 pub fn build(&self) -> io::Result<Runtime> {
444 let RuntimeBuilder {
445 proactor_builder,
446 thread_affinity,
447 sync_queue_size,
448 local_queue_size,
449 event_interval,
450 } = self;
451
452 if !thread_affinity.is_empty() {
453 bind_to_cpu_set(thread_affinity);
454 }
455 let driver = proactor_builder.build()?;
456 let executor = Executor::with_config(ExecutorConfig {
457 max_interval: *event_interval,
458 sync_queue_size: *sync_queue_size,
459 local_queue_size: *local_queue_size,
460 waker: Some(driver.waker()),
461 });
462 Ok(Runtime {
463 executor: Rc::new(executor),
464 driver: Rc::new(RefCell::new(driver)),
465 #[cfg(feature = "time")]
466 timer_runtime: Rc::new(RefCell::new(TimerRuntime::new())),
467 })
468 }
469}
470
471/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
472///
473/// Spawning a task enables the task to execute concurrently to other tasks.
474/// There is no guarantee that a spawned task will execute to completion.
475///
476/// ```
477/// # compio_runtime::Runtime::new().unwrap().block_on(async {
478/// use compio_runtime::ResumeUnwind;
479///
480/// let task = compio_runtime::spawn(async {
481/// println!("Hello from a spawned task!");
482/// 42
483/// });
484///
485/// assert_eq!(
486/// task.await.resume_unwind().expect("shouldn't be cancelled"),
487/// 42
488/// );
489/// # })
490/// ```
491///
492/// ## Panics
493///
494/// This method doesn't create runtime. It tries to obtain the current runtime
495/// by [`Runtime::with_current`].
496pub fn spawn<F: Future + 'static>(future: F) -> JoinHandle<F::Output> {
497 Runtime::with_current(|r| r.spawn(future))
498}
499
500/// Spawns a blocking task in a new thread, and wait for it.
501///
502/// The task will not be cancelled even if the future is dropped.
503///
504/// ## Panics
505///
506/// This method doesn't create runtime. It tries to obtain the current runtime
507/// by [`Runtime::with_current`].
508pub fn spawn_blocking<T: Send + 'static>(
509 f: impl (FnOnce() -> T) + Send + 'static,
510) -> JoinHandle<T> {
511 Runtime::with_current(|r| r.spawn_blocking(f))
512}
513
514/// Submit an operation to the current runtime, and return a future for it.
515///
516/// ## Panics
517///
518/// This method doesn't create runtime and will panic if it's not within a
519/// runtime. It tries to obtain the current runtime with
520/// [`Runtime::with_current`].
521pub fn submit<T: OpCode + 'static>(op: T) -> Submit<T> {
522 Runtime::with_current(|r| r.submit(op))
523}
524
525/// Submit a multishot operation to the current runtime, and return a stream for
526/// it.
527///
528/// ## Panics
529///
530/// This method doesn't create runtime and will panic if it's not within a
531/// runtime. It tries to obtain the current runtime with
532/// [`Runtime::with_current`].
533pub fn submit_multi<T: OpCode + 'static>(op: T) -> SubmitMulti<T> {
534 Runtime::with_current(|r| r.submit_multi(op))
535}
536
537/// Register file descriptors for fixed-file operations with the current
538/// runtime's io_uring instance.
539///
540/// This only works on `io_uring` driver. It will return an [`Unsupported`]
541/// error on other drivers.
542///
543/// ## Panics
544///
545/// This method doesn't create runtime. It tries to obtain the current runtime
546/// by [`Runtime::with_current`].
547///
548/// [`Unsupported`]: std::io::ErrorKind::Unsupported
549pub fn register_files(fds: &[RawFd]) -> io::Result<()> {
550 Runtime::with_current(|r| r.register_files(fds))
551}
552
553/// Unregister previously registered file descriptors from the current
554/// runtime's io_uring instance.
555///
556/// This only works on `io_uring` driver. It will return an [`Unsupported`]
557/// error on other drivers.
558///
559/// ## Panics
560///
561/// This method doesn't create runtime. It tries to obtain the current runtime
562/// by [`Runtime::with_current`].
563///
564/// [`Unsupported`]: std::io::ErrorKind::Unsupported
565pub fn unregister_files() -> io::Result<()> {
566 Runtime::with_current(|r| r.unregister_files())
567}