Skip to main content

compio_runtime/
lib.rs

1//! The compio runtime.
2//!
3//! ```
4//! let ans = compio_runtime::Runtime::new().unwrap().block_on(async {
5//!     println!("Hello world!");
6//!     42
7//! });
8//! assert_eq!(ans, 42);
9//! ```
10
11#![cfg_attr(docsrs, feature(doc_cfg))]
12#![cfg_attr(feature = "current_thread_id", feature(current_thread_id))]
13#![allow(unused_features)]
14#![warn(missing_docs)]
15#![deny(rustdoc::broken_intra_doc_links)]
16#![doc(
17    html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
18)]
19#![doc(
20    html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
21)]
22
23mod affinity;
24mod attacher;
25mod cancel;
26mod future;
27mod waker;
28
29pub mod fd;
30
31#[cfg(feature = "time")]
32pub mod time;
33
34use std::{
35    cell::RefCell,
36    collections::HashSet,
37    fmt::Debug,
38    future::Future,
39    io,
40    rc::Rc,
41    task::{Context, Poll, Waker},
42    time::Duration,
43};
44
45use compio_buf::{BufResult, IntoInner};
46use compio_driver::{AsRawFd, DriverType, OpCode, Proactor, ProactorBuilder, RawFd, op::Asyncify};
47pub use compio_driver::{BufferPool, ErrorExt};
48use compio_executor::{Executor, ExecutorConfig};
49pub use compio_executor::{JoinHandle, ResumeUnwind};
50use compio_log::{debug, instrument};
51
52use crate::affinity::bind_to_cpu_set;
53#[cfg(feature = "time")]
54use crate::time::TimerRuntime;
55pub use crate::{attacher::*, cancel::CancelToken, future::*};
56
57scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
58
59#[cold]
60fn not_in_compio_runtime() -> ! {
61    panic!("not in a compio runtime")
62}
63
64/// The async runtime of compio.
65///
66/// It is a thread-local runtime, meaning it cannot be sent to other threads.
67#[derive(Clone)]
68pub struct Runtime {
69    executor: Rc<Executor>,
70    driver: Rc<RefCell<Proactor>>,
71    #[cfg(feature = "time")]
72    timer_runtime: Rc<RefCell<TimerRuntime>>,
73}
74
75impl Debug for Runtime {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        let mut s = f.debug_struct("Runtime");
78        s.field("executor", &self.executor);
79        s.field("driver", &"...");
80        #[cfg(feature = "time")]
81        s.field("timer_runtime", &"...");
82        s.finish()
83    }
84}
85
86impl Runtime {
87    /// Create [`Runtime`] with default config.
88    pub fn new() -> io::Result<Self> {
89        Self::builder().build()
90    }
91
92    /// Create a builder for [`Runtime`].
93    pub fn builder() -> RuntimeBuilder {
94        RuntimeBuilder::new()
95    }
96
97    /// The current driver type.
98    pub fn driver_type(&self) -> DriverType {
99        self.driver.borrow().driver_type()
100    }
101
102    /// Try to perform a function on the current runtime, and if no runtime is
103    /// running, return the function back.
104    pub fn try_with_current<T, F: FnOnce(&Self) -> T>(f: F) -> Result<T, F> {
105        if CURRENT_RUNTIME.is_set() {
106            Ok(CURRENT_RUNTIME.with(f))
107        } else {
108            Err(f)
109        }
110    }
111
112    /// Perform a function on the current runtime.
113    ///
114    /// ## Panics
115    ///
116    /// This method will panic if there is no running [`Runtime`].
117    pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
118        if CURRENT_RUNTIME.is_set() {
119            CURRENT_RUNTIME.with(f)
120        } else {
121            not_in_compio_runtime()
122        }
123    }
124
125    /// Try to get the current runtime, and if no runtime is running, return
126    /// `None`.
127    pub fn try_current() -> Option<Self> {
128        if CURRENT_RUNTIME.is_set() {
129            Some(CURRENT_RUNTIME.with(|r| r.clone()))
130        } else {
131            None
132        }
133    }
134
135    /// Get the current runtime.
136    ///
137    /// # Panics
138    ///
139    /// This method will panic if there is no running [`Runtime`].
140    pub fn current() -> Self {
141        if CURRENT_RUNTIME.is_set() {
142            CURRENT_RUNTIME.with(|r| r.clone())
143        } else {
144            not_in_compio_runtime()
145        }
146    }
147
148    /// Set this runtime as current runtime, and perform a function in the
149    /// current scope.
150    pub fn enter<T, F: FnOnce() -> T>(&self, f: F) -> T {
151        CURRENT_RUNTIME.set(self, f)
152    }
153
154    /// Low level API to control the runtime.
155    ///
156    /// Run the scheduled tasks.
157    ///
158    /// The return value indicates whether there are still tasks in the queue.
159    pub fn run(&self) -> bool {
160        self.executor.tick()
161    }
162
163    /// Low level API to control the runtime.
164    ///
165    /// Create a waker that always notifies the runtime when woken.
166    pub fn waker(&self) -> Waker {
167        self.driver.borrow().waker()
168    }
169
170    /// Block on the future till it completes.
171    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
172        self.enter(|| {
173            let waker = self.waker();
174            let mut context = Context::from_waker(&waker);
175            let mut future = std::pin::pin!(future);
176            loop {
177                if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
178                    self.run();
179                    return result;
180                }
181                let remaining_tasks = self.run();
182                if remaining_tasks {
183                    self.poll_with(Some(Duration::ZERO));
184                } else {
185                    self.poll();
186                }
187            }
188        })
189    }
190
191    /// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
192    ///
193    /// Spawning a task enables the task to execute concurrently to other tasks.
194    /// There is no guarantee that a spawned task will execute to completion.
195    pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
196        self.executor.spawn(future)
197    }
198
199    /// Spawns a blocking task in a new thread, and wait for it.
200    ///
201    /// The task will not be cancelled even if the future is dropped.
202    pub fn spawn_blocking<T: Send + 'static>(
203        &self,
204        f: impl (FnOnce() -> T) + Send + 'static,
205    ) -> JoinHandle<T> {
206        use futures_util::FutureExt;
207
208        let op = Asyncify::new(move || {
209            // TODO: Refactor blocking pool and handle panic within worker and propagate it
210            // back
211            let res = f();
212            BufResult(Ok(0), res)
213        });
214        let submit = self.submit(op);
215        self.spawn(submit.map(|res| res.1.into_inner()))
216    }
217
218    /// Attach a raw file descriptor/handle/socket to the runtime.
219    ///
220    /// You only need this when authoring your own high-level APIs. High-level
221    /// resources in this crate are attached automatically.
222    pub fn attach(&self, fd: RawFd) -> io::Result<()> {
223        self.driver.borrow_mut().attach(fd)
224    }
225
226    /// Submit an operation to the runtime.
227    ///
228    /// You only need this when authoring your own [`OpCode`].
229    pub fn submit<T: OpCode + 'static>(&self, op: T) -> Submit<T> {
230        Submit::new(self.driver.clone(), op)
231    }
232
233    /// Submit a multishot operation to the runtime.
234    ///
235    /// You only need this when authoring your own [`OpCode`].
236    pub fn submit_multi<T: OpCode + 'static>(&self, op: T) -> SubmitMulti<T> {
237        SubmitMulti::new(self.driver.clone(), op)
238    }
239
240    /// Flush the driver and return whether the driver has been notified.
241    ///
242    /// See [`Proactor::flush`] for more details.
243    pub fn flush(&self) -> bool {
244        self.driver.borrow_mut().flush()
245    }
246
247    /// Low level API to control the runtime.
248    ///
249    /// Get the timeout value to be passed to [`Proactor::poll`].
250    pub fn current_timeout(&self) -> Option<Duration> {
251        #[cfg(not(feature = "time"))]
252        let timeout = None;
253        #[cfg(feature = "time")]
254        let timeout = self.timer_runtime.borrow().min_timeout();
255        timeout
256    }
257
258    /// Low level API to control the runtime.
259    ///
260    /// Poll the inner proactor. It is equal to calling [`Runtime::poll_with`]
261    /// with [`Runtime::current_timeout`].
262    pub fn poll(&self) {
263        instrument!(compio_log::Level::DEBUG, "poll");
264        let timeout = self.current_timeout();
265        debug!("timeout: {:?}", timeout);
266        self.poll_with(timeout)
267    }
268
269    /// Low level API to control the runtime.
270    ///
271    /// Poll the inner proactor with a custom timeout.
272    pub fn poll_with(&self, timeout: Option<Duration>) {
273        instrument!(compio_log::Level::DEBUG, "poll_with");
274
275        let mut driver = self.driver.borrow_mut();
276        match driver.poll(timeout) {
277            Ok(()) => {}
278            Err(e) => match e.kind() {
279                io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => {
280                    debug!("expected error: {e}");
281                }
282                _ => panic!("{e:?}"),
283            },
284        }
285        #[cfg(feature = "time")]
286        self.timer_runtime.borrow_mut().wake();
287    }
288
289    /// Get buffer pool of the runtime.
290    ///
291    /// This will lazily initialize the pool at the first time it's accessed,
292    /// and future access to the pool will be cheap and infallible.
293    pub fn buffer_pool(&self) -> io::Result<BufferPool> {
294        self.driver.borrow_mut().buffer_pool()
295    }
296
297    /// Register file descriptors for fixed-file operations.
298    ///
299    /// This is only supported on io-uring driver, and will return an
300    /// [`Unsupported`] io error on all other drivers.
301    ///
302    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
303    pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
304        self.driver.borrow_mut().register_files(fds)
305    }
306
307    /// Unregister previously registered file descriptors.
308    ///
309    /// This is only supported on io-uring driver, and will return an
310    /// [`Unsupported`] io error on all other drivers.
311    ///
312    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
313    pub fn unregister_files(&self) -> io::Result<()> {
314        self.driver.borrow_mut().unregister_files()
315    }
316
317    /// Register the personality for the runtime.
318    ///
319    /// This is only supported on io-uring driver, and will return an
320    /// [`Unsupported`] io error on all other drivers.
321    ///
322    /// The returned personality can be used with
323    /// [`FutureExt::with_personality`].
324    ///
325    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
326    pub fn register_personality(&self) -> io::Result<u16> {
327        self.driver.borrow_mut().register_personality()
328    }
329
330    /// Unregister the given personality for the runtime.
331    ///
332    /// This is only supported on io-uring driver, and will return an
333    /// [`Unsupported`] io error on all other drivers.
334    ///
335    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
336    pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
337        self.driver.borrow_mut().unregister_personality(personality)
338    }
339}
340
341impl Drop for Runtime {
342    fn drop(&mut self) {
343        // this is not the last runtime reference, no need to clear
344        if Rc::strong_count(&self.executor) > 1 {
345            return;
346        }
347
348        self.enter(|| {
349            self.executor.clear();
350        })
351    }
352}
353
354impl AsRawFd for Runtime {
355    fn as_raw_fd(&self) -> RawFd {
356        self.driver.borrow().as_raw_fd()
357    }
358}
359
360#[cfg(feature = "criterion")]
361impl criterion::async_executor::AsyncExecutor for Runtime {
362    fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
363        self.block_on(future)
364    }
365}
366
367#[cfg(feature = "criterion")]
368impl criterion::async_executor::AsyncExecutor for &Runtime {
369    fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
370        (**self).block_on(future)
371    }
372}
373
374/// Builder for [`Runtime`].
375#[derive(Debug, Clone)]
376pub struct RuntimeBuilder {
377    proactor_builder: ProactorBuilder,
378    thread_affinity: HashSet<usize>,
379    sync_queue_size: usize,
380    local_queue_size: usize,
381    event_interval: u32,
382}
383
384impl Default for RuntimeBuilder {
385    fn default() -> Self {
386        Self::new()
387    }
388}
389
390impl RuntimeBuilder {
391    /// Create the builder with default config.
392    pub fn new() -> Self {
393        Self {
394            proactor_builder: ProactorBuilder::new(),
395            event_interval: 61,
396            sync_queue_size: 64,
397            local_queue_size: 64,
398            thread_affinity: HashSet::new(),
399        }
400    }
401
402    /// Replace proactor builder.
403    pub fn with_proactor(&mut self, builder: ProactorBuilder) -> &mut Self {
404        self.proactor_builder = builder;
405        self
406    }
407
408    /// Sets the thread affinity for the runtime.
409    pub fn thread_affinity(&mut self, cpus: HashSet<usize>) -> &mut Self {
410        self.thread_affinity = cpus;
411        self
412    }
413
414    /// Sets the number of scheduler ticks after which the scheduler will poll
415    /// for external events (timers, I/O, and so on).
416    ///
417    /// A scheduler “tick” roughly corresponds to one poll invocation on a task.
418    pub fn event_interval(&mut self, val: usize) -> &mut Self {
419        self.event_interval = val as _;
420        self
421    }
422
423    /// The size of the sync queue, which is used to wake up tasks from other
424    /// threads (remote).
425    ///
426    /// This is fixed and will create backpressure in other remote threads when
427    /// full.
428    pub fn sync_queue_size(&mut self, val: usize) -> &mut Self {
429        self.sync_queue_size = val;
430        self
431    }
432
433    /// The size of the local queues, which is used to wake up tasks within the
434    /// same thread.
435    ///
436    /// This is dynamically resized to avoid blocking.
437    pub fn local_queue_size(&mut self, val: usize) -> &mut Self {
438        self.local_queue_size = val;
439        self
440    }
441
442    /// Build [`Runtime`].
443    pub fn build(&self) -> io::Result<Runtime> {
444        let RuntimeBuilder {
445            proactor_builder,
446            thread_affinity,
447            sync_queue_size,
448            local_queue_size,
449            event_interval,
450        } = self;
451
452        if !thread_affinity.is_empty() {
453            bind_to_cpu_set(thread_affinity);
454        }
455        let driver = proactor_builder.build()?;
456        let executor = Executor::with_config(ExecutorConfig {
457            max_interval: *event_interval,
458            sync_queue_size: *sync_queue_size,
459            local_queue_size: *local_queue_size,
460            waker: Some(driver.waker()),
461        });
462        Ok(Runtime {
463            executor: Rc::new(executor),
464            driver: Rc::new(RefCell::new(driver)),
465            #[cfg(feature = "time")]
466            timer_runtime: Rc::new(RefCell::new(TimerRuntime::new())),
467        })
468    }
469}
470
471/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
472///
473/// Spawning a task enables the task to execute concurrently to other tasks.
474/// There is no guarantee that a spawned task will execute to completion.
475///
476/// ```
477/// # compio_runtime::Runtime::new().unwrap().block_on(async {
478/// use compio_runtime::ResumeUnwind;
479///
480/// let task = compio_runtime::spawn(async {
481///     println!("Hello from a spawned task!");
482///     42
483/// });
484///
485/// assert_eq!(
486///     task.await.resume_unwind().expect("shouldn't be cancelled"),
487///     42
488/// );
489/// # })
490/// ```
491///
492/// ## Panics
493///
494/// This method doesn't create runtime. It tries to obtain the current runtime
495/// by [`Runtime::with_current`].
496pub fn spawn<F: Future + 'static>(future: F) -> JoinHandle<F::Output> {
497    Runtime::with_current(|r| r.spawn(future))
498}
499
500/// Spawns a blocking task in a new thread, and wait for it.
501///
502/// The task will not be cancelled even if the future is dropped.
503///
504/// ## Panics
505///
506/// This method doesn't create runtime. It tries to obtain the current runtime
507/// by [`Runtime::with_current`].
508pub fn spawn_blocking<T: Send + 'static>(
509    f: impl (FnOnce() -> T) + Send + 'static,
510) -> JoinHandle<T> {
511    Runtime::with_current(|r| r.spawn_blocking(f))
512}
513
514/// Submit an operation to the current runtime, and return a future for it.
515///
516/// ## Panics
517///
518/// This method doesn't create runtime and will panic if it's not within a
519/// runtime. It tries to obtain the current runtime with
520/// [`Runtime::with_current`].
521pub fn submit<T: OpCode + 'static>(op: T) -> Submit<T> {
522    Runtime::with_current(|r| r.submit(op))
523}
524
525/// Submit a multishot operation to the current runtime, and return a stream for
526/// it.
527///
528/// ## Panics
529///
530/// This method doesn't create runtime and will panic if it's not within a
531/// runtime. It tries to obtain the current runtime with
532/// [`Runtime::with_current`].
533pub fn submit_multi<T: OpCode + 'static>(op: T) -> SubmitMulti<T> {
534    Runtime::with_current(|r| r.submit_multi(op))
535}
536
537/// Register file descriptors for fixed-file operations with the current
538/// runtime's io_uring instance.
539///
540/// This only works on `io_uring` driver. It will return an [`Unsupported`]
541/// error on other drivers.
542///
543/// ## Panics
544///
545/// This method doesn't create runtime. It tries to obtain the current runtime
546/// by [`Runtime::with_current`].
547///
548/// [`Unsupported`]: std::io::ErrorKind::Unsupported
549pub fn register_files(fds: &[RawFd]) -> io::Result<()> {
550    Runtime::with_current(|r| r.register_files(fds))
551}
552
553/// Unregister previously registered file descriptors from the current
554/// runtime's io_uring instance.
555///
556/// This only works on `io_uring` driver. It will return an [`Unsupported`]
557/// error on other drivers.
558///
559/// ## Panics
560///
561/// This method doesn't create runtime. It tries to obtain the current runtime
562/// by [`Runtime::with_current`].
563///
564/// [`Unsupported`]: std::io::ErrorKind::Unsupported
565pub fn unregister_files() -> io::Result<()> {
566    Runtime::with_current(|r| r.unregister_files())
567}