Skip to main content

compio_runtime/
lib.rs

1//! The compio runtime.
2//!
3//! ```
4//! let ans = compio_runtime::Runtime::new().unwrap().block_on(async {
5//!     println!("Hello world!");
6//!     42
7//! });
8//! assert_eq!(ans, 42);
9//! ```
10
11#![cfg_attr(docsrs, feature(doc_cfg))]
12#![cfg_attr(feature = "current_thread_id", feature(current_thread_id))]
13#![cfg_attr(feature = "read_buf", feature(read_buf, core_io_borrowed_buf))]
14#![allow(unused_features)]
15#![warn(missing_docs)]
16#![deny(rustdoc::broken_intra_doc_links)]
17#![doc(
18    html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
19)]
20#![doc(
21    html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
22)]
23
24mod affinity;
25mod attacher;
26mod cancel;
27mod future;
28mod waker;
29
30pub mod fd;
31
32#[cfg(feature = "time")]
33pub mod time;
34
35use std::{
36    cell::RefCell,
37    collections::HashSet,
38    fmt::Debug,
39    future::Future,
40    io,
41    rc::Rc,
42    task::{Context, Poll, Waker},
43    time::Duration,
44};
45
46use compio_buf::{BufResult, IntoInner};
47use compio_driver::{AsRawFd, DriverType, OpCode, Proactor, ProactorBuilder, RawFd, op::Asyncify};
48pub use compio_driver::{BufferPool, ErrorExt};
49use compio_executor::{Executor, ExecutorConfig};
50pub use compio_executor::{JoinHandle, ResumeUnwind};
51use compio_log::{debug, instrument};
52
53use crate::affinity::bind_to_cpu_set;
54#[cfg(feature = "time")]
55use crate::time::TimerRuntime;
56pub use crate::{attacher::*, cancel::CancelToken, future::*};
57
58scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
59
60#[cold]
61fn not_in_compio_runtime() -> ! {
62    panic!("not in a compio runtime")
63}
64
65/// The async runtime of compio.
66///
67/// It is a thread-local runtime, meaning it cannot be sent to other threads.
68#[derive(Clone)]
69pub struct Runtime {
70    executor: Rc<Executor>,
71    driver: Rc<RefCell<Proactor>>,
72    #[cfg(feature = "time")]
73    timer_runtime: Rc<RefCell<TimerRuntime>>,
74}
75
76impl Debug for Runtime {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        let mut s = f.debug_struct("Runtime");
79        s.field("executor", &self.executor);
80        s.field("driver", &"...");
81        #[cfg(feature = "time")]
82        s.field("timer_runtime", &"...");
83        s.finish()
84    }
85}
86
87impl Runtime {
88    /// Create [`Runtime`] with default config.
89    pub fn new() -> io::Result<Self> {
90        Self::builder().build()
91    }
92
93    /// Create a builder for [`Runtime`].
94    pub fn builder() -> RuntimeBuilder {
95        RuntimeBuilder::new()
96    }
97
98    /// The current driver type.
99    pub fn driver_type(&self) -> DriverType {
100        self.driver.borrow().driver_type()
101    }
102
103    /// Try to perform a function on the current runtime, and if no runtime is
104    /// running, return the function back.
105    pub fn try_with_current<T, F: FnOnce(&Self) -> T>(f: F) -> Result<T, F> {
106        if CURRENT_RUNTIME.is_set() {
107            Ok(CURRENT_RUNTIME.with(f))
108        } else {
109            Err(f)
110        }
111    }
112
113    /// Perform a function on the current runtime.
114    ///
115    /// ## Panics
116    ///
117    /// This method will panic if there is no running [`Runtime`].
118    pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
119        if CURRENT_RUNTIME.is_set() {
120            CURRENT_RUNTIME.with(f)
121        } else {
122            not_in_compio_runtime()
123        }
124    }
125
126    /// Try to get the current runtime, and if no runtime is running, return
127    /// `None`.
128    pub fn try_current() -> Option<Self> {
129        if CURRENT_RUNTIME.is_set() {
130            Some(CURRENT_RUNTIME.with(|r| r.clone()))
131        } else {
132            None
133        }
134    }
135
136    /// Get the current runtime.
137    ///
138    /// # Panics
139    ///
140    /// This method will panic if there is no running [`Runtime`].
141    pub fn current() -> Self {
142        if CURRENT_RUNTIME.is_set() {
143            CURRENT_RUNTIME.with(|r| r.clone())
144        } else {
145            not_in_compio_runtime()
146        }
147    }
148
149    /// Set this runtime as current runtime, and perform a function in the
150    /// current scope.
151    pub fn enter<T, F: FnOnce() -> T>(&self, f: F) -> T {
152        CURRENT_RUNTIME.set(self, f)
153    }
154
155    /// Low level API to control the runtime.
156    ///
157    /// Run the scheduled tasks.
158    ///
159    /// The return value indicates whether there are still tasks in the queue.
160    pub fn run(&self) -> bool {
161        self.executor.tick()
162    }
163
164    /// Low level API to control the runtime.
165    ///
166    /// Create a waker that always notifies the runtime when woken.
167    pub fn waker(&self) -> Waker {
168        self.driver.borrow().waker()
169    }
170
171    /// Block on the future till it completes.
172    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
173        self.enter(|| {
174            let waker = self.waker();
175            let mut context = Context::from_waker(&waker);
176            let mut future = std::pin::pin!(future);
177            loop {
178                if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
179                    self.run();
180                    return result;
181                }
182                let remaining_tasks = self.run();
183                if remaining_tasks {
184                    self.poll_with(Some(Duration::ZERO));
185                } else {
186                    self.poll();
187                }
188            }
189        })
190    }
191
192    /// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
193    ///
194    /// Spawning a task enables the task to execute concurrently to other tasks.
195    /// There is no guarantee that a spawned task will execute to completion.
196    pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
197        self.executor.spawn(future)
198    }
199
200    /// Spawns a blocking task in a new thread, and wait for it.
201    ///
202    /// The task will not be cancelled even if the future is dropped.
203    pub fn spawn_blocking<T: Send + 'static>(
204        &self,
205        f: impl (FnOnce() -> T) + Send + 'static,
206    ) -> JoinHandle<T> {
207        use futures_util::FutureExt;
208
209        let op = Asyncify::new(move || {
210            // TODO: Refactor blocking pool and handle panic within worker and propagate it
211            // back
212            let res = f();
213            BufResult(Ok(0), res)
214        });
215        let submit = self.submit(op);
216        self.spawn(submit.map(|res| res.1.into_inner()))
217    }
218
219    /// Attach a raw file descriptor/handle/socket to the runtime.
220    ///
221    /// You only need this when authoring your own high-level APIs. High-level
222    /// resources in this crate are attached automatically.
223    pub fn attach(&self, fd: RawFd) -> io::Result<()> {
224        self.driver.borrow_mut().attach(fd)
225    }
226
227    /// Submit an operation to the runtime.
228    ///
229    /// You only need this when authoring your own [`OpCode`].
230    pub fn submit<T: OpCode + 'static>(&self, op: T) -> Submit<T> {
231        Submit::new(self.driver.clone(), op)
232    }
233
234    /// Submit a multishot operation to the runtime.
235    ///
236    /// You only need this when authoring your own [`OpCode`].
237    pub fn submit_multi<T: OpCode + 'static>(&self, op: T) -> SubmitMulti<T> {
238        SubmitMulti::new(self.driver.clone(), op)
239    }
240
241    /// Flush the driver and return whether the driver has been notified.
242    ///
243    /// See [`Proactor::flush`] for more details.
244    pub fn flush(&self) -> bool {
245        self.driver.borrow_mut().flush()
246    }
247
248    /// Low level API to control the runtime.
249    ///
250    /// Get the timeout value to be passed to [`Proactor::poll`].
251    pub fn current_timeout(&self) -> Option<Duration> {
252        #[cfg(not(feature = "time"))]
253        let timeout = None;
254        #[cfg(feature = "time")]
255        let timeout = self.timer_runtime.borrow().min_timeout();
256        timeout
257    }
258
259    /// Low level API to control the runtime.
260    ///
261    /// Poll the inner proactor. It is equal to calling [`Runtime::poll_with`]
262    /// with [`Runtime::current_timeout`].
263    pub fn poll(&self) {
264        instrument!(compio_log::Level::DEBUG, "poll");
265        let timeout = self.current_timeout();
266        debug!("timeout: {:?}", timeout);
267        self.poll_with(timeout)
268    }
269
270    /// Low level API to control the runtime.
271    ///
272    /// Poll the inner proactor with a custom timeout.
273    pub fn poll_with(&self, timeout: Option<Duration>) {
274        instrument!(compio_log::Level::DEBUG, "poll_with");
275
276        let mut driver = self.driver.borrow_mut();
277        match driver.poll(timeout) {
278            Ok(()) => {}
279            Err(e) => match e.kind() {
280                io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => {
281                    debug!("expected error: {e}");
282                }
283                _ => panic!("{e:?}"),
284            },
285        }
286        #[cfg(feature = "time")]
287        self.timer_runtime.borrow_mut().wake();
288    }
289
290    /// Get buffer pool of the runtime.
291    ///
292    /// This will lazily initialize the pool at the first time it's accessed,
293    /// and future access to the pool will be cheap and infallible.
294    pub fn buffer_pool(&self) -> io::Result<BufferPool> {
295        self.driver.borrow_mut().buffer_pool()
296    }
297
298    /// Register file descriptors for fixed-file operations.
299    ///
300    /// This is only supported on io-uring driver, and will return an
301    /// [`Unsupported`] io error on all other drivers.
302    ///
303    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
304    pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
305        self.driver.borrow_mut().register_files(fds)
306    }
307
308    /// Unregister previously registered file descriptors.
309    ///
310    /// This is only supported on io-uring driver, and will return an
311    /// [`Unsupported`] io error on all other drivers.
312    ///
313    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
314    pub fn unregister_files(&self) -> io::Result<()> {
315        self.driver.borrow_mut().unregister_files()
316    }
317
318    /// Register the personality for the runtime.
319    ///
320    /// This is only supported on io-uring driver, and will return an
321    /// [`Unsupported`] io error on all other drivers.
322    ///
323    /// The returned personality can be used with
324    /// [`FutureExt::with_personality`].
325    ///
326    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
327    pub fn register_personality(&self) -> io::Result<u16> {
328        self.driver.borrow_mut().register_personality()
329    }
330
331    /// Unregister the given personality for the runtime.
332    ///
333    /// This is only supported on io-uring driver, and will return an
334    /// [`Unsupported`] io error on all other drivers.
335    ///
336    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
337    pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
338        self.driver.borrow_mut().unregister_personality(personality)
339    }
340}
341
342impl Drop for Runtime {
343    fn drop(&mut self) {
344        // this is not the last runtime reference, no need to clear
345        if Rc::strong_count(&self.executor) > 1 {
346            return;
347        }
348
349        self.enter(|| {
350            self.executor.clear();
351        })
352    }
353}
354
355impl AsRawFd for Runtime {
356    fn as_raw_fd(&self) -> RawFd {
357        self.driver.borrow().as_raw_fd()
358    }
359}
360
361#[cfg(feature = "criterion")]
362impl criterion::async_executor::AsyncExecutor for Runtime {
363    fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
364        self.block_on(future)
365    }
366}
367
368#[cfg(feature = "criterion")]
369impl criterion::async_executor::AsyncExecutor for &Runtime {
370    fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
371        (**self).block_on(future)
372    }
373}
374
375/// Builder for [`Runtime`].
376#[derive(Debug, Clone)]
377pub struct RuntimeBuilder {
378    proactor_builder: ProactorBuilder,
379    thread_affinity: HashSet<usize>,
380    sync_queue_size: usize,
381    local_queue_size: usize,
382    event_interval: u32,
383}
384
385impl Default for RuntimeBuilder {
386    fn default() -> Self {
387        Self::new()
388    }
389}
390
391impl RuntimeBuilder {
392    /// Create the builder with default config.
393    pub fn new() -> Self {
394        Self {
395            proactor_builder: ProactorBuilder::new(),
396            event_interval: 61,
397            sync_queue_size: 64,
398            local_queue_size: 64,
399            thread_affinity: HashSet::new(),
400        }
401    }
402
403    /// Replace proactor builder.
404    pub fn with_proactor(&mut self, builder: ProactorBuilder) -> &mut Self {
405        self.proactor_builder = builder;
406        self
407    }
408
409    /// Sets the thread affinity for the runtime.
410    pub fn thread_affinity(&mut self, cpus: HashSet<usize>) -> &mut Self {
411        self.thread_affinity = cpus;
412        self
413    }
414
415    /// Sets the number of scheduler ticks after which the scheduler will poll
416    /// for external events (timers, I/O, and so on).
417    ///
418    /// A scheduler “tick” roughly corresponds to one poll invocation on a task.
419    pub fn event_interval(&mut self, val: usize) -> &mut Self {
420        self.event_interval = val as _;
421        self
422    }
423
424    /// The size of the sync queue, which is used to wake up tasks from other
425    /// threads (remote).
426    ///
427    /// This is fixed and will create backpressure in other remote threads when
428    /// full.
429    pub fn sync_queue_size(&mut self, val: usize) -> &mut Self {
430        self.sync_queue_size = val;
431        self
432    }
433
434    /// The size of the local queues, which is used to wake up tasks within the
435    /// same thread.
436    ///
437    /// This is dynamically resized to avoid blocking.
438    pub fn local_queue_size(&mut self, val: usize) -> &mut Self {
439        self.local_queue_size = val;
440        self
441    }
442
443    /// Build [`Runtime`].
444    pub fn build(&self) -> io::Result<Runtime> {
445        let RuntimeBuilder {
446            proactor_builder,
447            thread_affinity,
448            sync_queue_size,
449            local_queue_size,
450            event_interval,
451        } = self;
452
453        if !thread_affinity.is_empty() {
454            bind_to_cpu_set(thread_affinity);
455        }
456        let driver = proactor_builder.build()?;
457        let executor = Executor::with_config(ExecutorConfig {
458            max_interval: *event_interval,
459            sync_queue_size: *sync_queue_size,
460            local_queue_size: *local_queue_size,
461            waker: Some(driver.waker()),
462        });
463        Ok(Runtime {
464            executor: Rc::new(executor),
465            driver: Rc::new(RefCell::new(driver)),
466            #[cfg(feature = "time")]
467            timer_runtime: Rc::new(RefCell::new(TimerRuntime::new())),
468        })
469    }
470}
471
472/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
473///
474/// Spawning a task enables the task to execute concurrently to other tasks.
475/// There is no guarantee that a spawned task will execute to completion.
476///
477/// ```
478/// # compio_runtime::Runtime::new().unwrap().block_on(async {
479/// use compio_runtime::ResumeUnwind;
480///
481/// let task = compio_runtime::spawn(async {
482///     println!("Hello from a spawned task!");
483///     42
484/// });
485///
486/// assert_eq!(
487///     task.await.resume_unwind().expect("shouldn't be cancelled"),
488///     42
489/// );
490/// # })
491/// ```
492///
493/// ## Panics
494///
495/// This method doesn't create runtime. It tries to obtain the current runtime
496/// by [`Runtime::with_current`].
497pub fn spawn<F: Future + 'static>(future: F) -> JoinHandle<F::Output> {
498    Runtime::with_current(|r| r.spawn(future))
499}
500
501/// Spawns a blocking task in a new thread, and wait for it.
502///
503/// The task will not be cancelled even if the future is dropped.
504///
505/// ## Panics
506///
507/// This method doesn't create runtime. It tries to obtain the current runtime
508/// by [`Runtime::with_current`].
509pub fn spawn_blocking<T: Send + 'static>(
510    f: impl (FnOnce() -> T) + Send + 'static,
511) -> JoinHandle<T> {
512    Runtime::with_current(|r| r.spawn_blocking(f))
513}
514
515/// Submit an operation to the current runtime, and return a future for it.
516///
517/// ## Panics
518///
519/// This method doesn't create runtime and will panic if it's not within a
520/// runtime. It tries to obtain the current runtime with
521/// [`Runtime::with_current`].
522pub fn submit<T: OpCode + 'static>(op: T) -> Submit<T> {
523    Runtime::with_current(|r| r.submit(op))
524}
525
526/// Submit a multishot operation to the current runtime, and return a stream for
527/// it.
528///
529/// ## Panics
530///
531/// This method doesn't create runtime and will panic if it's not within a
532/// runtime. It tries to obtain the current runtime with
533/// [`Runtime::with_current`].
534pub fn submit_multi<T: OpCode + 'static>(op: T) -> SubmitMulti<T> {
535    Runtime::with_current(|r| r.submit_multi(op))
536}
537
538/// Register file descriptors for fixed-file operations with the current
539/// runtime's io_uring instance.
540///
541/// This only works on `io_uring` driver. It will return an [`Unsupported`]
542/// error on other drivers.
543///
544/// ## Panics
545///
546/// This method doesn't create runtime. It tries to obtain the current runtime
547/// by [`Runtime::with_current`].
548///
549/// [`Unsupported`]: std::io::ErrorKind::Unsupported
550pub fn register_files(fds: &[RawFd]) -> io::Result<()> {
551    Runtime::with_current(|r| r.register_files(fds))
552}
553
554/// Unregister previously registered file descriptors from the current
555/// runtime's io_uring instance.
556///
557/// This only works on `io_uring` driver. It will return an [`Unsupported`]
558/// error on other drivers.
559///
560/// ## Panics
561///
562/// This method doesn't create runtime. It tries to obtain the current runtime
563/// by [`Runtime::with_current`].
564///
565/// [`Unsupported`]: std::io::ErrorKind::Unsupported
566pub fn unregister_files() -> io::Result<()> {
567    Runtime::with_current(|r| r.unregister_files())
568}