Skip to main content

compio_runtime/
lib.rs

1//! The compio runtime.
2//!
3//! ```
4//! let ans = compio_runtime::Runtime::new().unwrap().block_on(async {
5//!     println!("Hello world!");
6//!     42
7//! });
8//! assert_eq!(ans, 42);
9//! ```
10
11#![cfg_attr(docsrs, feature(doc_cfg))]
12#![cfg_attr(feature = "current_thread_id", feature(current_thread_id))]
13#![allow(unused_features)]
14#![warn(missing_docs)]
15#![deny(rustdoc::broken_intra_doc_links)]
16#![doc(
17    html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
18)]
19#![doc(
20    html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
21)]
22
23mod affinity;
24mod attacher;
25mod cancel;
26mod future;
27mod waker;
28
29pub mod fd;
30
31#[cfg(feature = "time")]
32pub mod time;
33
34use std::{
35    cell::RefCell,
36    collections::HashSet,
37    fmt::Debug,
38    future::Future,
39    io,
40    ops::Deref,
41    rc::Rc,
42    sync::Arc,
43    task::{Context, Poll, Waker},
44    time::Duration,
45};
46
47use compio_buf::{BufResult, IntoInner};
48use compio_driver::{
49    AsRawFd, Cancel, DriverType, Extra, Key, OpCode, Proactor, ProactorBuilder, PushEntry, RawFd,
50    op::Asyncify,
51};
52pub use compio_driver::{BufferPool, ErrorExt};
53use compio_executor::{Executor, ExecutorConfig};
54pub use compio_executor::{JoinHandle, ResumeUnwind};
55use compio_log::{debug, instrument};
56
57use crate::affinity::bind_to_cpu_set;
58#[cfg(feature = "time")]
59use crate::time::{TimerFuture, TimerKey, TimerRuntime};
60pub use crate::{attacher::*, cancel::CancelToken, future::*, waker::OptWaker};
61
62scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
63
64#[cold]
65fn not_in_compio_runtime() -> ! {
66    panic!("not in a compio runtime")
67}
68
69/// Inner structure of [`Runtime`].
70pub struct RuntimeInner {
71    executor: Executor,
72    driver: RefCell<Proactor>,
73    #[cfg(feature = "time")]
74    timer_runtime: RefCell<TimerRuntime>,
75}
76
77/// The async runtime of compio.
78///
79/// It is a thread-local runtime, meaning it cannot be sent to other threads.
80#[derive(Clone)]
81pub struct Runtime(Rc<RuntimeInner>);
82
83impl Debug for Runtime {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        let mut s = f.debug_struct("Runtime");
86        s.field("executor", &self.0.executor)
87            .field("driver", &"...")
88            .field("scheduler", &"...");
89        #[cfg(feature = "time")]
90        s.field("timer_runtime", &"...");
91        s.finish()
92    }
93}
94
95impl Deref for Runtime {
96    type Target = RuntimeInner;
97
98    fn deref(&self) -> &Self::Target {
99        &self.0
100    }
101}
102
103impl Runtime {
104    /// Create [`Runtime`] with default config.
105    pub fn new() -> io::Result<Self> {
106        Self::builder().build()
107    }
108
109    /// Create a builder for [`Runtime`].
110    pub fn builder() -> RuntimeBuilder {
111        RuntimeBuilder::new()
112    }
113
114    /// The current driver type.
115    pub fn driver_type(&self) -> DriverType {
116        self.driver.borrow().driver_type()
117    }
118
119    /// Try to perform a function on the current runtime, and if no runtime is
120    /// running, return the function back.
121    pub fn try_with_current<T, F: FnOnce(&Self) -> T>(f: F) -> Result<T, F> {
122        if CURRENT_RUNTIME.is_set() {
123            Ok(CURRENT_RUNTIME.with(f))
124        } else {
125            Err(f)
126        }
127    }
128
129    /// Perform a function on the current runtime.
130    ///
131    /// ## Panics
132    ///
133    /// This method will panic if there is no running [`Runtime`].
134    pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
135        if CURRENT_RUNTIME.is_set() {
136            CURRENT_RUNTIME.with(f)
137        } else {
138            not_in_compio_runtime()
139        }
140    }
141
142    /// Try to get the current runtime, and if no runtime is running, return
143    /// `None`.
144    pub fn try_current() -> Option<Self> {
145        if CURRENT_RUNTIME.is_set() {
146            Some(CURRENT_RUNTIME.with(|r| r.clone()))
147        } else {
148            None
149        }
150    }
151
152    /// Get the current runtime.
153    ///
154    /// # Panics
155    ///
156    /// This method will panic if there is no running [`Runtime`].
157    pub fn current() -> Self {
158        if CURRENT_RUNTIME.is_set() {
159            CURRENT_RUNTIME.with(|r| r.clone())
160        } else {
161            not_in_compio_runtime()
162        }
163    }
164
165    /// Set this runtime as current runtime, and perform a function in the
166    /// current scope.
167    pub fn enter<T, F: FnOnce() -> T>(&self, f: F) -> T {
168        CURRENT_RUNTIME.set(self, f)
169    }
170
171    /// Low level API to control the runtime.
172    ///
173    /// Run the scheduled tasks.
174    ///
175    /// The return value indicates whether there are still tasks in the queue.
176    pub fn run(&self) -> bool {
177        self.executor.tick()
178    }
179
180    /// Low level API to control the runtime.
181    ///
182    /// Create a waker that always notifies the runtime when woken.
183    pub fn waker(&self) -> Waker {
184        self.driver.borrow().waker()
185    }
186
187    /// Low level API to control the runtime.
188    ///
189    /// Create an optimized waker that only notifies the runtime when woken
190    /// from another thread, or when `notify-always` is enabled.
191    pub fn opt_waker(&self) -> Arc<OptWaker> {
192        OptWaker::new(self.waker())
193    }
194
195    /// Block on the future till it completes.
196    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
197        self.enter(|| {
198            let opt_waker = self.opt_waker();
199            let waker = Waker::from(opt_waker.clone());
200            let mut context = Context::from_waker(&waker);
201            let mut future = std::pin::pin!(future);
202            loop {
203                if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
204                    self.run();
205                    return result;
206                }
207                // We always want to reset the waker here.
208                let remaining_tasks = self.run() | opt_waker.reset();
209                if remaining_tasks {
210                    self.poll_with(Some(Duration::ZERO));
211                } else {
212                    self.poll();
213                }
214            }
215        })
216    }
217
218    /// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
219    ///
220    /// Spawning a task enables the task to execute concurrently to other tasks.
221    /// There is no guarantee that a spawned task will execute to completion.
222    pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
223        self.0.executor.spawn(future)
224    }
225
226    /// Spawns a blocking task in a new thread, and wait for it.
227    ///
228    /// The task will not be cancelled even if the future is dropped.
229    pub fn spawn_blocking<T: Send + 'static>(
230        &self,
231        f: impl (FnOnce() -> T) + Send + 'static,
232    ) -> JoinHandle<T> {
233        let op = Asyncify::new(move || {
234            // TODO: Refactor blocking pool and handle panic within worker and propagate it
235            // back
236            let res = f();
237            BufResult(Ok(0), res)
238        });
239        let submit = self.submit(op);
240        self.spawn(async move { submit.await.1.into_inner() })
241    }
242
243    /// Attach a raw file descriptor/handle/socket to the runtime.
244    ///
245    /// You only need this when authoring your own high-level APIs. High-level
246    /// resources in this crate are attached automatically.
247    pub fn attach(&self, fd: RawFd) -> io::Result<()> {
248        self.driver.borrow_mut().attach(fd)
249    }
250
251    fn submit_raw<T: OpCode + 'static>(
252        &self,
253        op: T,
254        extra: Option<Extra>,
255    ) -> PushEntry<Key<T>, BufResult<usize, T>> {
256        let mut this = self.driver.borrow_mut();
257        match extra {
258            Some(e) => this.push_with_extra(op, e),
259            None => this.push(op),
260        }
261    }
262
263    fn default_extra(&self) -> Extra {
264        self.driver.borrow().default_extra()
265    }
266
267    /// Submit an operation to the runtime.
268    ///
269    /// You only need this when authoring your own [`OpCode`].
270    pub fn submit<T: OpCode + 'static>(&self, op: T) -> Submit<T> {
271        Submit::new(self.clone(), op)
272    }
273
274    /// Submit a multishot operation to the runtime.
275    ///
276    /// You only need this when authoring your own [`OpCode`].
277    pub fn submit_multi<T: OpCode + 'static>(&self, op: T) -> SubmitMulti<T> {
278        SubmitMulti::new(self.clone(), op)
279    }
280
281    pub(crate) fn cancel<T: OpCode>(&self, key: Key<T>) {
282        self.driver.borrow_mut().cancel(key);
283    }
284
285    pub(crate) fn register_cancel<T: OpCode>(&self, key: &Key<T>) -> Cancel {
286        self.driver.borrow_mut().register_cancel(key)
287    }
288
289    pub(crate) fn cancel_token(&self, token: Cancel) -> bool {
290        self.driver.borrow_mut().cancel_token(token)
291    }
292
293    #[cfg(feature = "time")]
294    pub(crate) fn cancel_timer(&self, key: &TimerKey) {
295        self.timer_runtime.borrow_mut().cancel(key);
296    }
297
298    pub(crate) fn poll_task<T: OpCode>(
299        &self,
300        waker: &Waker,
301        key: Key<T>,
302    ) -> PushEntry<Key<T>, BufResult<usize, T>> {
303        instrument!(compio_log::Level::DEBUG, "poll_task", ?key);
304        let mut driver = self.driver.borrow_mut();
305        driver.pop(key).map_pending(|k| {
306            driver.update_waker(&k, waker);
307            k
308        })
309    }
310
311    pub(crate) fn poll_task_with_extra<T: OpCode>(
312        &self,
313        waker: &Waker,
314        key: Key<T>,
315    ) -> PushEntry<Key<T>, (BufResult<usize, T>, Extra)> {
316        instrument!(compio_log::Level::DEBUG, "poll_task_with_extra", ?key);
317        let mut driver = self.driver.borrow_mut();
318        driver.pop_with_extra(key).map_pending(|k| {
319            driver.update_waker(&k, waker);
320            k
321        })
322    }
323
324    pub(crate) fn poll_multishot<T: OpCode>(
325        &self,
326        waker: &Waker,
327        key: &Key<T>,
328    ) -> Option<BufResult<usize, Extra>> {
329        instrument!(compio_log::Level::DEBUG, "poll_multishot", ?key);
330        let mut driver = self.driver.borrow_mut();
331        if let Some(res) = driver.pop_multishot(key) {
332            return Some(res);
333        }
334        driver.update_waker(key, waker);
335        None
336    }
337
338    #[cfg(feature = "time")]
339    pub(crate) fn poll_timer(&self, cx: &mut Context, key: &TimerKey) -> Poll<()> {
340        instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key);
341        let mut timer_runtime = self.timer_runtime.borrow_mut();
342        if timer_runtime.is_completed(key) {
343            debug!("ready");
344            Poll::Ready(())
345        } else {
346            debug!("pending");
347            timer_runtime.update_waker(key, cx.waker());
348            Poll::Pending
349        }
350    }
351
352    /// Low level API to control the runtime.
353    ///
354    /// Get the timeout value to be passed to [`Proactor::poll`].
355    pub fn current_timeout(&self) -> Option<Duration> {
356        #[cfg(not(feature = "time"))]
357        let timeout = None;
358        #[cfg(feature = "time")]
359        let timeout = self.timer_runtime.borrow().min_timeout();
360        timeout
361    }
362
363    /// Low level API to control the runtime.
364    ///
365    /// Poll the inner proactor. It is equal to calling [`Runtime::poll_with`]
366    /// with [`Runtime::current_timeout`].
367    pub fn poll(&self) {
368        instrument!(compio_log::Level::DEBUG, "poll");
369        let timeout = self.current_timeout();
370        debug!("timeout: {:?}", timeout);
371        self.poll_with(timeout)
372    }
373
374    /// Low level API to control the runtime.
375    ///
376    /// Poll the inner proactor with a custom timeout.
377    pub fn poll_with(&self, timeout: Option<Duration>) {
378        instrument!(compio_log::Level::DEBUG, "poll_with");
379
380        let mut driver = self.driver.borrow_mut();
381        match driver.poll(timeout) {
382            Ok(()) => {}
383            Err(e) => match e.kind() {
384                io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => {
385                    debug!("expected error: {e}");
386                }
387                _ => panic!("{e:?}"),
388            },
389        }
390        #[cfg(feature = "time")]
391        self.timer_runtime.borrow_mut().wake();
392    }
393
394    /// Get buffer pool of the runtime.
395    ///
396    /// This will lazily initialize the pool at the first time it's accessed,
397    /// and future access to the pool will be cheap and infallible.
398    pub fn buffer_pool(&self) -> io::Result<BufferPool> {
399        self.driver.borrow_mut().buffer_pool()
400    }
401
402    /// Register file descriptors for fixed-file operations.
403    ///
404    /// This is only supported on io-uring driver, and will return an
405    /// [`Unsupported`] io error on all other drivers.
406    ///
407    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
408    pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
409        self.driver.borrow_mut().register_files(fds)
410    }
411
412    /// Unregister previously registered file descriptors.
413    ///
414    /// This is only supported on io-uring driver, and will return an
415    /// [`Unsupported`] io error on all other drivers.
416    ///
417    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
418    pub fn unregister_files(&self) -> io::Result<()> {
419        self.driver.borrow_mut().unregister_files()
420    }
421
422    /// Register the personality for the runtime.
423    ///
424    /// This is only supported on io-uring driver, and will return an
425    /// [`Unsupported`] io error on all other drivers.
426    ///
427    /// The returned personality can be used with
428    /// [`FutureExt::with_personality`].
429    ///
430    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
431    pub fn register_personality(&self) -> io::Result<u16> {
432        self.driver.borrow_mut().register_personality()
433    }
434
435    /// Unregister the given personality for the runtime.
436    ///
437    /// This is only supported on io-uring driver, and will return an
438    /// [`Unsupported`] io error on all other drivers.
439    ///
440    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
441    pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
442        self.driver.borrow_mut().unregister_personality(personality)
443    }
444}
445
446impl Drop for Runtime {
447    fn drop(&mut self) {
448        // this is not the last runtime reference, no need to clear
449        if Rc::strong_count(&self.0) > 1 {
450            return;
451        }
452
453        self.enter(|| {
454            self.executor.clear();
455        })
456    }
457}
458
459impl AsRawFd for Runtime {
460    fn as_raw_fd(&self) -> RawFd {
461        self.driver.borrow().as_raw_fd()
462    }
463}
464
465#[cfg(feature = "criterion")]
466impl criterion::async_executor::AsyncExecutor for Runtime {
467    fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
468        self.block_on(future)
469    }
470}
471
472#[cfg(feature = "criterion")]
473impl criterion::async_executor::AsyncExecutor for &Runtime {
474    fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
475        (**self).block_on(future)
476    }
477}
478
479/// Builder for [`Runtime`].
480#[derive(Debug, Clone)]
481pub struct RuntimeBuilder {
482    proactor_builder: ProactorBuilder,
483    thread_affinity: HashSet<usize>,
484    sync_queue_size: usize,
485    local_queue_size: usize,
486    event_interval: u32,
487}
488
489impl Default for RuntimeBuilder {
490    fn default() -> Self {
491        Self::new()
492    }
493}
494
495impl RuntimeBuilder {
496    /// Create the builder with default config.
497    pub fn new() -> Self {
498        Self {
499            proactor_builder: ProactorBuilder::new(),
500            event_interval: 61,
501            sync_queue_size: 64,
502            local_queue_size: 64,
503            thread_affinity: HashSet::new(),
504        }
505    }
506
507    /// Replace proactor builder.
508    pub fn with_proactor(&mut self, builder: ProactorBuilder) -> &mut Self {
509        self.proactor_builder = builder;
510        self
511    }
512
513    /// Sets the thread affinity for the runtime.
514    pub fn thread_affinity(&mut self, cpus: HashSet<usize>) -> &mut Self {
515        self.thread_affinity = cpus;
516        self
517    }
518
519    /// Sets the number of scheduler ticks after which the scheduler will poll
520    /// for external events (timers, I/O, and so on).
521    ///
522    /// A scheduler “tick” roughly corresponds to one poll invocation on a task.
523    pub fn event_interval(&mut self, val: usize) -> &mut Self {
524        self.event_interval = val as _;
525        self
526    }
527
528    /// The size of the sync queue, which is used to wake up tasks from other
529    /// threads (remote).
530    ///
531    /// This is fixed and will create backpressure in other remote threads when
532    /// full.
533    pub fn sync_queue_size(&mut self, val: usize) -> &mut Self {
534        self.sync_queue_size = val;
535        self
536    }
537
538    /// The size of the local queues, which is used to wake up tasks within the
539    /// same thread.
540    ///
541    /// This is dynamically resized to avoid blocking.
542    pub fn local_queue_size(&mut self, val: usize) -> &mut Self {
543        self.local_queue_size = val;
544        self
545    }
546
547    /// Build [`Runtime`].
548    pub fn build(&self) -> io::Result<Runtime> {
549        let RuntimeBuilder {
550            proactor_builder,
551            thread_affinity,
552            sync_queue_size,
553            local_queue_size,
554            event_interval,
555        } = self;
556
557        if !thread_affinity.is_empty() {
558            bind_to_cpu_set(thread_affinity);
559        }
560        let driver = proactor_builder.build()?;
561        let executor = Executor::with_config(ExecutorConfig {
562            max_interval: *event_interval,
563            sync_queue_size: *sync_queue_size,
564            local_queue_size: *local_queue_size,
565            waker: Some(driver.waker()),
566        });
567        let inner = RuntimeInner {
568            executor,
569            driver: RefCell::new(driver),
570            #[cfg(feature = "time")]
571            timer_runtime: RefCell::new(TimerRuntime::new()),
572        };
573        Ok(Runtime(Rc::new(inner)))
574    }
575}
576
577/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
578///
579/// Spawning a task enables the task to execute concurrently to other tasks.
580/// There is no guarantee that a spawned task will execute to completion.
581///
582/// ```
583/// # compio_runtime::Runtime::new().unwrap().block_on(async {
584/// use compio_runtime::ResumeUnwind;
585///
586/// let task = compio_runtime::spawn(async {
587///     println!("Hello from a spawned task!");
588///     42
589/// });
590///
591/// assert_eq!(
592///     task.await.resume_unwind().expect("shouldn't be cancelled"),
593///     42
594/// );
595/// # })
596/// ```
597///
598/// ## Panics
599///
600/// This method doesn't create runtime. It tries to obtain the current runtime
601/// by [`Runtime::with_current`].
602pub fn spawn<F: Future + 'static>(future: F) -> JoinHandle<F::Output> {
603    Runtime::with_current(|r| r.spawn(future))
604}
605
606/// Spawns a blocking task in a new thread, and wait for it.
607///
608/// The task will not be cancelled even if the future is dropped.
609///
610/// ## Panics
611///
612/// This method doesn't create runtime. It tries to obtain the current runtime
613/// by [`Runtime::with_current`].
614pub fn spawn_blocking<T: Send + 'static>(
615    f: impl (FnOnce() -> T) + Send + 'static,
616) -> JoinHandle<T> {
617    Runtime::with_current(|r| r.spawn_blocking(f))
618}
619
620/// Submit an operation to the current runtime, and return a future for it.
621///
622/// ## Panics
623///
624/// This method doesn't create runtime and will panic if it's not within a
625/// runtime. It tries to obtain the current runtime with
626/// [`Runtime::with_current`].
627pub fn submit<T: OpCode + 'static>(op: T) -> Submit<T> {
628    Runtime::with_current(|r| r.submit(op))
629}
630
631/// Submit a multishot operation to the current runtime, and return a stream for
632/// it.
633///
634/// ## Panics
635///
636/// This method doesn't create runtime and will panic if it's not within a
637/// runtime. It tries to obtain the current runtime with
638/// [`Runtime::with_current`].
639pub fn submit_multi<T: OpCode + 'static>(op: T) -> SubmitMulti<T> {
640    Runtime::with_current(|r| r.submit_multi(op))
641}
642
643/// Register file descriptors for fixed-file operations with the current
644/// runtime's io_uring instance.
645///
646/// This only works on `io_uring` driver. It will return an [`Unsupported`]
647/// error on other drivers.
648///
649/// ## Panics
650///
651/// This method doesn't create runtime. It tries to obtain the current runtime
652/// by [`Runtime::with_current`].
653///
654/// [`Unsupported`]: std::io::ErrorKind::Unsupported
655pub fn register_files(fds: &[RawFd]) -> io::Result<()> {
656    Runtime::with_current(|r| r.register_files(fds))
657}
658
659/// Unregister previously registered file descriptors from the current
660/// runtime's io_uring instance.
661///
662/// This only works on `io_uring` driver. It will return an [`Unsupported`]
663/// error on other drivers.
664///
665/// ## Panics
666///
667/// This method doesn't create runtime. It tries to obtain the current runtime
668/// by [`Runtime::with_current`].
669///
670/// [`Unsupported`]: std::io::ErrorKind::Unsupported
671pub fn unregister_files() -> io::Result<()> {
672    Runtime::with_current(|r| r.unregister_files())
673}
674
675#[cfg(feature = "time")]
676pub(crate) async fn create_timer(instant: std::time::Instant) {
677    let key = Runtime::with_current(|r| r.timer_runtime.borrow_mut().insert(instant));
678    if let Some(key) = key {
679        TimerFuture::new(key).await
680    }
681}