Skip to main content

compio_runtime/
lib.rs

1//! The compio runtime.
2//!
3//! ```
4//! let ans = compio_runtime::Runtime::new().unwrap().block_on(async {
5//!     println!("Hello world!");
6//!     42
7//! });
8//! assert_eq!(ans, 42);
9//! ```
10
11#![cfg_attr(docsrs, feature(doc_cfg))]
12#![cfg_attr(feature = "current_thread_id", feature(current_thread_id))]
13#![allow(unused_features)]
14#![warn(missing_docs)]
15#![deny(rustdoc::broken_intra_doc_links)]
16#![doc(
17    html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
18)]
19#![doc(
20    html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
21)]
22
23mod affinity;
24mod attacher;
25mod cancel;
26mod future;
27mod waker;
28
29pub mod fd;
30
31#[cfg(feature = "time")]
32pub mod time;
33
34use std::{
35    cell::RefCell,
36    collections::HashSet,
37    fmt::Debug,
38    future::Future,
39    io,
40    ops::Deref,
41    rc::Rc,
42    task::{Context, Poll, Waker},
43    time::Duration,
44};
45
46use compio_buf::{BufResult, IntoInner};
47use compio_driver::{
48    AsRawFd, Cancel, DriverType, Extra, Key, OpCode, Proactor, ProactorBuilder, PushEntry, RawFd,
49    op::Asyncify,
50};
51pub use compio_driver::{BufferPool, ErrorExt};
52use compio_executor::{Executor, ExecutorConfig};
53pub use compio_executor::{JoinHandle, ResumeUnwind};
54use compio_log::{debug, instrument};
55
56use crate::affinity::bind_to_cpu_set;
57#[cfg(feature = "time")]
58use crate::time::{TimerFuture, TimerKey, TimerRuntime};
59pub use crate::{attacher::*, cancel::CancelToken, future::*};
60
61scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
62
63#[cold]
64fn not_in_compio_runtime() -> ! {
65    panic!("not in a compio runtime")
66}
67
68/// Inner structure of [`Runtime`].
69pub struct RuntimeInner {
70    executor: Executor,
71    driver: RefCell<Proactor>,
72    #[cfg(feature = "time")]
73    timer_runtime: RefCell<TimerRuntime>,
74}
75
76/// The async runtime of compio.
77///
78/// It is a thread-local runtime, meaning it cannot be sent to other threads.
79#[derive(Clone)]
80pub struct Runtime(Rc<RuntimeInner>);
81
82impl Debug for Runtime {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        let mut s = f.debug_struct("Runtime");
85        s.field("executor", &self.0.executor)
86            .field("driver", &"...")
87            .field("scheduler", &"...");
88        #[cfg(feature = "time")]
89        s.field("timer_runtime", &"...");
90        s.finish()
91    }
92}
93
94impl Deref for Runtime {
95    type Target = RuntimeInner;
96
97    fn deref(&self) -> &Self::Target {
98        &self.0
99    }
100}
101
102impl Runtime {
103    /// Create [`Runtime`] with default config.
104    pub fn new() -> io::Result<Self> {
105        Self::builder().build()
106    }
107
108    /// Create a builder for [`Runtime`].
109    pub fn builder() -> RuntimeBuilder {
110        RuntimeBuilder::new()
111    }
112
113    /// The current driver type.
114    pub fn driver_type(&self) -> DriverType {
115        self.driver.borrow().driver_type()
116    }
117
118    /// Try to perform a function on the current runtime, and if no runtime is
119    /// running, return the function back.
120    pub fn try_with_current<T, F: FnOnce(&Self) -> T>(f: F) -> Result<T, F> {
121        if CURRENT_RUNTIME.is_set() {
122            Ok(CURRENT_RUNTIME.with(f))
123        } else {
124            Err(f)
125        }
126    }
127
128    /// Perform a function on the current runtime.
129    ///
130    /// ## Panics
131    ///
132    /// This method will panic if there is no running [`Runtime`].
133    pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
134        if CURRENT_RUNTIME.is_set() {
135            CURRENT_RUNTIME.with(f)
136        } else {
137            not_in_compio_runtime()
138        }
139    }
140
141    /// Try to get the current runtime, and if no runtime is running, return
142    /// `None`.
143    pub fn try_current() -> Option<Self> {
144        if CURRENT_RUNTIME.is_set() {
145            Some(CURRENT_RUNTIME.with(|r| r.clone()))
146        } else {
147            None
148        }
149    }
150
151    /// Get the current runtime.
152    ///
153    /// # Panics
154    ///
155    /// This method will panic if there is no running [`Runtime`].
156    pub fn current() -> Self {
157        if CURRENT_RUNTIME.is_set() {
158            CURRENT_RUNTIME.with(|r| r.clone())
159        } else {
160            not_in_compio_runtime()
161        }
162    }
163
164    /// Set this runtime as current runtime, and perform a function in the
165    /// current scope.
166    pub fn enter<T, F: FnOnce() -> T>(&self, f: F) -> T {
167        CURRENT_RUNTIME.set(self, f)
168    }
169
170    /// Low level API to control the runtime.
171    ///
172    /// Run the scheduled tasks.
173    ///
174    /// The return value indicates whether there are still tasks in the queue.
175    pub fn run(&self) -> bool {
176        self.executor.tick()
177    }
178
179    /// Low level API to control the runtime.
180    ///
181    /// Create a waker that always notifies the runtime when woken.
182    pub fn waker(&self) -> Waker {
183        self.driver.borrow().waker()
184    }
185
186    /// Block on the future till it completes.
187    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
188        self.enter(|| {
189            let waker = self.waker();
190            let mut context = Context::from_waker(&waker);
191            let mut future = std::pin::pin!(future);
192            loop {
193                if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
194                    self.run();
195                    return result;
196                }
197                let remaining_tasks = self.run();
198                if remaining_tasks {
199                    self.poll_with(Some(Duration::ZERO));
200                } else {
201                    self.poll();
202                }
203            }
204        })
205    }
206
207    /// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
208    ///
209    /// Spawning a task enables the task to execute concurrently to other tasks.
210    /// There is no guarantee that a spawned task will execute to completion.
211    pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
212        self.0.executor.spawn(future)
213    }
214
215    /// Spawns a blocking task in a new thread, and wait for it.
216    ///
217    /// The task will not be cancelled even if the future is dropped.
218    pub fn spawn_blocking<T: Send + 'static>(
219        &self,
220        f: impl (FnOnce() -> T) + Send + 'static,
221    ) -> JoinHandle<T> {
222        let op = Asyncify::new(move || {
223            // TODO: Refactor blocking pool and handle panic within worker and propagate it
224            // back
225            let res = f();
226            BufResult(Ok(0), res)
227        });
228        let submit = self.submit(op);
229        self.spawn(async move { submit.await.1.into_inner() })
230    }
231
232    /// Attach a raw file descriptor/handle/socket to the runtime.
233    ///
234    /// You only need this when authoring your own high-level APIs. High-level
235    /// resources in this crate are attached automatically.
236    pub fn attach(&self, fd: RawFd) -> io::Result<()> {
237        self.driver.borrow_mut().attach(fd)
238    }
239
240    fn submit_raw<T: OpCode + 'static>(
241        &self,
242        op: T,
243        extra: Option<Extra>,
244    ) -> PushEntry<Key<T>, BufResult<usize, T>> {
245        let mut this = self.driver.borrow_mut();
246        match extra {
247            Some(e) => this.push_with_extra(op, e),
248            None => this.push(op),
249        }
250    }
251
252    fn default_extra(&self) -> Extra {
253        self.driver.borrow().default_extra()
254    }
255
256    /// Submit an operation to the runtime.
257    ///
258    /// You only need this when authoring your own [`OpCode`].
259    pub fn submit<T: OpCode + 'static>(&self, op: T) -> Submit<T> {
260        Submit::new(self.clone(), op)
261    }
262
263    /// Submit a multishot operation to the runtime.
264    ///
265    /// You only need this when authoring your own [`OpCode`].
266    pub fn submit_multi<T: OpCode + 'static>(&self, op: T) -> SubmitMulti<T> {
267        SubmitMulti::new(self.clone(), op)
268    }
269
270    /// Flush the driver and return whether the driver has been notified.
271    ///
272    /// See [`Proactor::flush`] for more details.
273    pub fn flush(&self) -> bool {
274        self.driver.borrow_mut().flush()
275    }
276
277    pub(crate) fn cancel<T: OpCode>(&self, key: Key<T>) {
278        self.driver.borrow_mut().cancel(key);
279    }
280
281    pub(crate) fn register_cancel<T: OpCode>(&self, key: &Key<T>) -> Cancel {
282        self.driver.borrow_mut().register_cancel(key)
283    }
284
285    pub(crate) fn cancel_token(&self, token: Cancel) -> bool {
286        self.driver.borrow_mut().cancel_token(token)
287    }
288
289    #[cfg(feature = "time")]
290    pub(crate) fn cancel_timer(&self, key: &TimerKey) {
291        self.timer_runtime.borrow_mut().cancel(key);
292    }
293
294    pub(crate) fn poll_task<T: OpCode>(
295        &self,
296        waker: &Waker,
297        key: Key<T>,
298    ) -> PushEntry<Key<T>, BufResult<usize, T>> {
299        instrument!(compio_log::Level::DEBUG, "poll_task", ?key);
300        let mut driver = self.driver.borrow_mut();
301        driver.pop(key).map_pending(|k| {
302            driver.update_waker(&k, waker);
303            k
304        })
305    }
306
307    pub(crate) fn poll_task_with_extra<T: OpCode>(
308        &self,
309        waker: &Waker,
310        key: Key<T>,
311    ) -> PushEntry<Key<T>, (BufResult<usize, T>, Extra)> {
312        instrument!(compio_log::Level::DEBUG, "poll_task_with_extra", ?key);
313        let mut driver = self.driver.borrow_mut();
314        driver.pop_with_extra(key).map_pending(|k| {
315            driver.update_waker(&k, waker);
316            k
317        })
318    }
319
320    pub(crate) fn poll_multishot<T: OpCode>(
321        &self,
322        waker: &Waker,
323        key: &Key<T>,
324    ) -> Option<BufResult<usize, Extra>> {
325        instrument!(compio_log::Level::DEBUG, "poll_multishot", ?key);
326        let mut driver = self.driver.borrow_mut();
327        if let Some(res) = driver.pop_multishot(key) {
328            return Some(res);
329        }
330        driver.update_waker(key, waker);
331        None
332    }
333
334    #[cfg(feature = "time")]
335    pub(crate) fn poll_timer(&self, cx: &mut Context, key: &TimerKey) -> Poll<()> {
336        instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key);
337        let mut timer_runtime = self.timer_runtime.borrow_mut();
338        if timer_runtime.is_completed(key) {
339            debug!("ready");
340            Poll::Ready(())
341        } else {
342            debug!("pending");
343            timer_runtime.update_waker(key, cx.waker());
344            Poll::Pending
345        }
346    }
347
348    /// Low level API to control the runtime.
349    ///
350    /// Get the timeout value to be passed to [`Proactor::poll`].
351    pub fn current_timeout(&self) -> Option<Duration> {
352        #[cfg(not(feature = "time"))]
353        let timeout = None;
354        #[cfg(feature = "time")]
355        let timeout = self.timer_runtime.borrow().min_timeout();
356        timeout
357    }
358
359    /// Low level API to control the runtime.
360    ///
361    /// Poll the inner proactor. It is equal to calling [`Runtime::poll_with`]
362    /// with [`Runtime::current_timeout`].
363    pub fn poll(&self) {
364        instrument!(compio_log::Level::DEBUG, "poll");
365        let timeout = self.current_timeout();
366        debug!("timeout: {:?}", timeout);
367        self.poll_with(timeout)
368    }
369
370    /// Low level API to control the runtime.
371    ///
372    /// Poll the inner proactor with a custom timeout.
373    pub fn poll_with(&self, timeout: Option<Duration>) {
374        instrument!(compio_log::Level::DEBUG, "poll_with");
375
376        let mut driver = self.driver.borrow_mut();
377        match driver.poll(timeout) {
378            Ok(()) => {}
379            Err(e) => match e.kind() {
380                io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => {
381                    debug!("expected error: {e}");
382                }
383                _ => panic!("{e:?}"),
384            },
385        }
386        #[cfg(feature = "time")]
387        self.timer_runtime.borrow_mut().wake();
388    }
389
390    /// Get buffer pool of the runtime.
391    ///
392    /// This will lazily initialize the pool at the first time it's accessed,
393    /// and future access to the pool will be cheap and infallible.
394    pub fn buffer_pool(&self) -> io::Result<BufferPool> {
395        self.driver.borrow_mut().buffer_pool()
396    }
397
398    /// Register file descriptors for fixed-file operations.
399    ///
400    /// This is only supported on io-uring driver, and will return an
401    /// [`Unsupported`] io error on all other drivers.
402    ///
403    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
404    pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
405        self.driver.borrow_mut().register_files(fds)
406    }
407
408    /// Unregister previously registered file descriptors.
409    ///
410    /// This is only supported on io-uring driver, and will return an
411    /// [`Unsupported`] io error on all other drivers.
412    ///
413    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
414    pub fn unregister_files(&self) -> io::Result<()> {
415        self.driver.borrow_mut().unregister_files()
416    }
417
418    /// Register the personality for the runtime.
419    ///
420    /// This is only supported on io-uring driver, and will return an
421    /// [`Unsupported`] io error on all other drivers.
422    ///
423    /// The returned personality can be used with
424    /// [`FutureExt::with_personality`].
425    ///
426    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
427    pub fn register_personality(&self) -> io::Result<u16> {
428        self.driver.borrow_mut().register_personality()
429    }
430
431    /// Unregister the given personality for the runtime.
432    ///
433    /// This is only supported on io-uring driver, and will return an
434    /// [`Unsupported`] io error on all other drivers.
435    ///
436    /// [`Unsupported`]: std::io::ErrorKind::Unsupported
437    pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
438        self.driver.borrow_mut().unregister_personality(personality)
439    }
440}
441
442impl Drop for Runtime {
443    fn drop(&mut self) {
444        // this is not the last runtime reference, no need to clear
445        if Rc::strong_count(&self.0) > 1 {
446            return;
447        }
448
449        self.enter(|| {
450            self.executor.clear();
451        })
452    }
453}
454
455impl AsRawFd for Runtime {
456    fn as_raw_fd(&self) -> RawFd {
457        self.driver.borrow().as_raw_fd()
458    }
459}
460
461#[cfg(feature = "criterion")]
462impl criterion::async_executor::AsyncExecutor for Runtime {
463    fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
464        self.block_on(future)
465    }
466}
467
468#[cfg(feature = "criterion")]
469impl criterion::async_executor::AsyncExecutor for &Runtime {
470    fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
471        (**self).block_on(future)
472    }
473}
474
475/// Builder for [`Runtime`].
476#[derive(Debug, Clone)]
477pub struct RuntimeBuilder {
478    proactor_builder: ProactorBuilder,
479    thread_affinity: HashSet<usize>,
480    sync_queue_size: usize,
481    local_queue_size: usize,
482    event_interval: u32,
483}
484
485impl Default for RuntimeBuilder {
486    fn default() -> Self {
487        Self::new()
488    }
489}
490
491impl RuntimeBuilder {
492    /// Create the builder with default config.
493    pub fn new() -> Self {
494        Self {
495            proactor_builder: ProactorBuilder::new(),
496            event_interval: 61,
497            sync_queue_size: 64,
498            local_queue_size: 64,
499            thread_affinity: HashSet::new(),
500        }
501    }
502
503    /// Replace proactor builder.
504    pub fn with_proactor(&mut self, builder: ProactorBuilder) -> &mut Self {
505        self.proactor_builder = builder;
506        self
507    }
508
509    /// Sets the thread affinity for the runtime.
510    pub fn thread_affinity(&mut self, cpus: HashSet<usize>) -> &mut Self {
511        self.thread_affinity = cpus;
512        self
513    }
514
515    /// Sets the number of scheduler ticks after which the scheduler will poll
516    /// for external events (timers, I/O, and so on).
517    ///
518    /// A scheduler “tick” roughly corresponds to one poll invocation on a task.
519    pub fn event_interval(&mut self, val: usize) -> &mut Self {
520        self.event_interval = val as _;
521        self
522    }
523
524    /// The size of the sync queue, which is used to wake up tasks from other
525    /// threads (remote).
526    ///
527    /// This is fixed and will create backpressure in other remote threads when
528    /// full.
529    pub fn sync_queue_size(&mut self, val: usize) -> &mut Self {
530        self.sync_queue_size = val;
531        self
532    }
533
534    /// The size of the local queues, which is used to wake up tasks within the
535    /// same thread.
536    ///
537    /// This is dynamically resized to avoid blocking.
538    pub fn local_queue_size(&mut self, val: usize) -> &mut Self {
539        self.local_queue_size = val;
540        self
541    }
542
543    /// Build [`Runtime`].
544    pub fn build(&self) -> io::Result<Runtime> {
545        let RuntimeBuilder {
546            proactor_builder,
547            thread_affinity,
548            sync_queue_size,
549            local_queue_size,
550            event_interval,
551        } = self;
552
553        if !thread_affinity.is_empty() {
554            bind_to_cpu_set(thread_affinity);
555        }
556        let driver = proactor_builder.build()?;
557        let executor = Executor::with_config(ExecutorConfig {
558            max_interval: *event_interval,
559            sync_queue_size: *sync_queue_size,
560            local_queue_size: *local_queue_size,
561            waker: Some(driver.waker()),
562        });
563        let inner = RuntimeInner {
564            executor,
565            driver: RefCell::new(driver),
566            #[cfg(feature = "time")]
567            timer_runtime: RefCell::new(TimerRuntime::new()),
568        };
569        Ok(Runtime(Rc::new(inner)))
570    }
571}
572
573/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
574///
575/// Spawning a task enables the task to execute concurrently to other tasks.
576/// There is no guarantee that a spawned task will execute to completion.
577///
578/// ```
579/// # compio_runtime::Runtime::new().unwrap().block_on(async {
580/// use compio_runtime::ResumeUnwind;
581///
582/// let task = compio_runtime::spawn(async {
583///     println!("Hello from a spawned task!");
584///     42
585/// });
586///
587/// assert_eq!(
588///     task.await.resume_unwind().expect("shouldn't be cancelled"),
589///     42
590/// );
591/// # })
592/// ```
593///
594/// ## Panics
595///
596/// This method doesn't create runtime. It tries to obtain the current runtime
597/// by [`Runtime::with_current`].
598pub fn spawn<F: Future + 'static>(future: F) -> JoinHandle<F::Output> {
599    Runtime::with_current(|r| r.spawn(future))
600}
601
602/// Spawns a blocking task in a new thread, and wait for it.
603///
604/// The task will not be cancelled even if the future is dropped.
605///
606/// ## Panics
607///
608/// This method doesn't create runtime. It tries to obtain the current runtime
609/// by [`Runtime::with_current`].
610pub fn spawn_blocking<T: Send + 'static>(
611    f: impl (FnOnce() -> T) + Send + 'static,
612) -> JoinHandle<T> {
613    Runtime::with_current(|r| r.spawn_blocking(f))
614}
615
616/// Submit an operation to the current runtime, and return a future for it.
617///
618/// ## Panics
619///
620/// This method doesn't create runtime and will panic if it's not within a
621/// runtime. It tries to obtain the current runtime with
622/// [`Runtime::with_current`].
623pub fn submit<T: OpCode + 'static>(op: T) -> Submit<T> {
624    Runtime::with_current(|r| r.submit(op))
625}
626
627/// Submit a multishot operation to the current runtime, and return a stream for
628/// it.
629///
630/// ## Panics
631///
632/// This method doesn't create runtime and will panic if it's not within a
633/// runtime. It tries to obtain the current runtime with
634/// [`Runtime::with_current`].
635pub fn submit_multi<T: OpCode + 'static>(op: T) -> SubmitMulti<T> {
636    Runtime::with_current(|r| r.submit_multi(op))
637}
638
639/// Register file descriptors for fixed-file operations with the current
640/// runtime's io_uring instance.
641///
642/// This only works on `io_uring` driver. It will return an [`Unsupported`]
643/// error on other drivers.
644///
645/// ## Panics
646///
647/// This method doesn't create runtime. It tries to obtain the current runtime
648/// by [`Runtime::with_current`].
649///
650/// [`Unsupported`]: std::io::ErrorKind::Unsupported
651pub fn register_files(fds: &[RawFd]) -> io::Result<()> {
652    Runtime::with_current(|r| r.register_files(fds))
653}
654
655/// Unregister previously registered file descriptors from the current
656/// runtime's io_uring instance.
657///
658/// This only works on `io_uring` driver. It will return an [`Unsupported`]
659/// error on other drivers.
660///
661/// ## Panics
662///
663/// This method doesn't create runtime. It tries to obtain the current runtime
664/// by [`Runtime::with_current`].
665///
666/// [`Unsupported`]: std::io::ErrorKind::Unsupported
667pub fn unregister_files() -> io::Result<()> {
668    Runtime::with_current(|r| r.unregister_files())
669}
670
671#[cfg(feature = "time")]
672pub(crate) async fn create_timer(instant: std::time::Instant) {
673    let key = Runtime::with_current(|r| r.timer_runtime.borrow_mut().insert(instant));
674    if let Some(key) = key {
675        TimerFuture::new(key).await
676    }
677}