Skip to main content

openraft_rt/
async_runtime.rs

1//! `async` runtime interface.
2//!
3//! `async` runtime is an abstraction over different asynchronous runtimes, such as `tokio`,
4//! `async-std`, etc.
5
6use std::fmt::Debug;
7use std::fmt::Display;
8use std::future::Future;
9use std::io;
10use std::time::Duration;
11
12use crate::Instant;
13use crate::Mpsc;
14use crate::Mutex;
15use crate::Oneshot;
16use crate::OptionalSend;
17use crate::OptionalSync;
18use crate::Watch;
19
20/// A trait defining interfaces with an asynchronous runtime.
21///
22/// The intention of this trait is to allow an application using this crate to bind an asynchronous
23/// runtime that suits it the best.
24///
25/// Some additional related functions are also exposed by this trait.
26///
27/// ## Note
28///
29/// The default asynchronous runtime is `tokio`.
30pub trait AsyncRuntime: Debug + OptionalSend + OptionalSync + 'static {
31    /// The error type of [`Self::JoinHandle`].
32    type JoinError: Debug + Display + OptionalSend;
33
34    /// The return type of [`Self::spawn`].
35    type JoinHandle<T: OptionalSend + 'static>: Future<Output = Result<T, Self::JoinError>>
36        + OptionalSend
37        + OptionalSync
38        + Unpin;
39
40    /// The type that enables the user to sleep in an asynchronous runtime.
41    type Sleep: Future<Output = ()> + OptionalSend + OptionalSync;
42
43    /// A measurement of a monotonically non-decreasing clock.
44    type Instant: Instant;
45
46    /// The timeout error type.
47    type TimeoutError: Debug + Display + OptionalSend;
48
49    /// The timeout type used by [`Self::timeout`] and [`Self::timeout_at`] that enables the user
50    /// to await the outcome of a [`Future`].
51    type Timeout<R, T: Future<Output = R> + OptionalSend>: Future<Output = Result<R, Self::TimeoutError>> + OptionalSend;
52
53    /// Type of thread-local random number generator.
54    type ThreadLocalRng: rand::Rng;
55
56    /// Spawn a new task.
57    #[track_caller]
58    fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
59    where
60        T: Future + OptionalSend + 'static,
61        T::Output: OptionalSend + 'static;
62
63    /// Wait until `duration` has elapsed.
64    #[track_caller]
65    fn sleep(duration: Duration) -> Self::Sleep;
66
67    /// Wait until `deadline` is reached.
68    #[track_caller]
69    fn sleep_until(deadline: Self::Instant) -> Self::Sleep;
70
71    /// Require a [`Future`] to complete before the specified duration has elapsed.
72    #[track_caller]
73    fn timeout<R, F: Future<Output = R> + OptionalSend>(duration: Duration, future: F) -> Self::Timeout<R, F>;
74
75    /// Require a [`Future`] to complete before the specified instant in time.
76    #[track_caller]
77    fn timeout_at<R, F: Future<Output = R> + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout<R, F>;
78
79    /// Check if the [`Self::JoinError`] is `panic`.
80    #[track_caller]
81    fn is_panic(join_error: &Self::JoinError) -> bool;
82
83    /// Get the random number generator to use for generating random numbers.
84    ///
85    /// # Note
86    ///
87    /// This is a per-thread instance, which cannot be shared across threads or
88    /// sent to another thread.
89    #[track_caller]
90    fn thread_rng() -> Self::ThreadLocalRng;
91
92    /// The bounded MPSC channel implementation.
93    type Mpsc: Mpsc;
94
95    /// The watch channel implementation.
96    type Watch: Watch;
97
98    /// The oneshot channel implementation.
99    type Oneshot: Oneshot;
100
101    /// The async mutex implementation.
102    type Mutex<T: OptionalSend + 'static>: Mutex<T>;
103
104    /// Create a new runtime instance for testing purposes.
105    ///
106    /// **Note**: This method is primarily intended for testing and is not used by Openraft
107    /// internally. In production applications, the runtime should be created and managed
108    /// by the application itself, with Openraft running within that runtime.
109    ///
110    /// # Arguments
111    ///
112    /// * `threads` - Number of worker threads. Multi-threaded runtimes (like Tokio) will use this
113    ///   value; single-threaded runtimes (like Monoio, Compio) may ignore it.
114    fn new(threads: usize) -> Self;
115
116    /// Run a future to completion on this runtime.
117    ///
118    /// This runs synchronously on the current thread, so `Send` is not required.
119    fn block_on<F, T>(&mut self, future: F) -> T
120    where
121        F: Future<Output = T>,
122        T: OptionalSend;
123
124    /// Convenience method: create a runtime and run the future to completion.
125    ///
126    /// Creates a runtime with default configuration (8 threads) and runs the future.
127    /// For simple cases where you don't need to reuse the runtime.
128    /// If you need to run multiple futures, consider using [`Self::new`] and
129    /// [`Self::block_on`] directly.
130    ///
131    /// This runs synchronously on the current thread, so `Send` is not required.
132    fn run<F, T>(future: F) -> T
133    where
134        Self: Sized,
135        F: Future<Output = T>,
136        T: OptionalSend,
137    {
138        Self::new(8).block_on(future)
139    }
140
141    /// Run a blocking function on a separate thread.
142    ///
143    /// The default implementation spawns a new OS thread for each call.
144    /// Runtime implementations may override this with their own thread pool
145    /// (e.g., tokio's `spawn_blocking`) for better resource management.
146    fn spawn_blocking<F, T>(f: F) -> impl Future<Output = Result<T, io::Error>> + Send
147    where
148        F: FnOnce() -> T + Send + 'static,
149        T: Send + 'static,
150    {
151        let (tx, rx) = futures_channel::oneshot::channel();
152        std::thread::spawn(move || {
153            tx.send(f()).ok();
154        });
155        async { rx.await.map_err(|_| io::Error::other("spawn_blocking task cancelled")) }
156    }
157}