openraft_rt/async_runtime.rs
1//! `async` runtime interface.
2//!
3//! `async` runtime is an abstraction over different asynchronous runtimes, such as `tokio`,
4//! `async-std`, etc.
5
6use std::fmt::Debug;
7use std::fmt::Display;
8use std::future::Future;
9use std::io;
10use std::time::Duration;
11
12use crate::Instant;
13use crate::Mpsc;
14use crate::Mutex;
15use crate::Oneshot;
16use crate::OptionalSend;
17use crate::OptionalSync;
18use crate::Watch;
19
20/// A trait defining interfaces with an asynchronous runtime.
21///
22/// The intention of this trait is to allow an application using this crate to bind an asynchronous
23/// runtime that suits it the best.
24///
25/// Some additional related functions are also exposed by this trait.
26///
27/// ## Note
28///
29/// The default asynchronous runtime is `tokio`.
30pub trait AsyncRuntime: Debug + OptionalSend + OptionalSync + 'static {
31 /// The error type of [`Self::JoinHandle`].
32 type JoinError: Debug + Display + OptionalSend;
33
34 /// The return type of [`Self::spawn`].
35 type JoinHandle<T: OptionalSend + 'static>: Future<Output = Result<T, Self::JoinError>>
36 + OptionalSend
37 + OptionalSync
38 + Unpin;
39
40 /// The type that enables the user to sleep in an asynchronous runtime.
41 type Sleep: Future<Output = ()> + OptionalSend + OptionalSync;
42
43 /// A measurement of a monotonically non-decreasing clock.
44 type Instant: Instant;
45
46 /// The timeout error type.
47 type TimeoutError: Debug + Display + OptionalSend;
48
49 /// The timeout type used by [`Self::timeout`] and [`Self::timeout_at`] that enables the user
50 /// to await the outcome of a [`Future`].
51 type Timeout<R, T: Future<Output = R> + OptionalSend>: Future<Output = Result<R, Self::TimeoutError>> + OptionalSend;
52
53 /// Type of thread-local random number generator.
54 type ThreadLocalRng: rand::Rng;
55
56 /// Spawn a new task.
57 #[track_caller]
58 fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
59 where
60 T: Future + OptionalSend + 'static,
61 T::Output: OptionalSend + 'static;
62
63 /// Wait until `duration` has elapsed.
64 #[track_caller]
65 fn sleep(duration: Duration) -> Self::Sleep;
66
67 /// Wait until `deadline` is reached.
68 #[track_caller]
69 fn sleep_until(deadline: Self::Instant) -> Self::Sleep;
70
71 /// Require a [`Future`] to complete before the specified duration has elapsed.
72 #[track_caller]
73 fn timeout<R, F: Future<Output = R> + OptionalSend>(duration: Duration, future: F) -> Self::Timeout<R, F>;
74
75 /// Require a [`Future`] to complete before the specified instant in time.
76 #[track_caller]
77 fn timeout_at<R, F: Future<Output = R> + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout<R, F>;
78
79 /// Check if the [`Self::JoinError`] is `panic`.
80 #[track_caller]
81 fn is_panic(join_error: &Self::JoinError) -> bool;
82
83 /// Get the random number generator to use for generating random numbers.
84 ///
85 /// # Note
86 ///
87 /// This is a per-thread instance, which cannot be shared across threads or
88 /// sent to another thread.
89 #[track_caller]
90 fn thread_rng() -> Self::ThreadLocalRng;
91
92 /// The bounded MPSC channel implementation.
93 type Mpsc: Mpsc;
94
95 /// The watch channel implementation.
96 type Watch: Watch;
97
98 /// The oneshot channel implementation.
99 type Oneshot: Oneshot;
100
101 /// The async mutex implementation.
102 type Mutex<T: OptionalSend + 'static>: Mutex<T>;
103
104 /// Create a new runtime instance for testing purposes.
105 ///
106 /// **Note**: This method is primarily intended for testing and is not used by Openraft
107 /// internally. In production applications, the runtime should be created and managed
108 /// by the application itself, with Openraft running within that runtime.
109 ///
110 /// # Arguments
111 ///
112 /// * `threads` - Number of worker threads. Multi-threaded runtimes (like Tokio) will use this
113 /// value; single-threaded runtimes (like Monoio, Compio) may ignore it.
114 fn new(threads: usize) -> Self;
115
116 /// Run a future to completion on this runtime.
117 ///
118 /// This runs synchronously on the current thread, so `Send` is not required.
119 fn block_on<F, T>(&mut self, future: F) -> T
120 where
121 F: Future<Output = T>,
122 T: OptionalSend;
123
124 /// Convenience method: create a runtime and run the future to completion.
125 ///
126 /// Creates a runtime with default configuration (8 threads) and runs the future.
127 /// For simple cases where you don't need to reuse the runtime.
128 /// If you need to run multiple futures, consider using [`Self::new`] and
129 /// [`Self::block_on`] directly.
130 ///
131 /// This runs synchronously on the current thread, so `Send` is not required.
132 fn run<F, T>(future: F) -> T
133 where
134 Self: Sized,
135 F: Future<Output = T>,
136 T: OptionalSend,
137 {
138 Self::new(8).block_on(future)
139 }
140
141 /// Run a blocking function on a separate thread.
142 ///
143 /// The default implementation spawns a new OS thread for each call.
144 /// Runtime implementations may override this with their own thread pool
145 /// (e.g., tokio's `spawn_blocking`) for better resource management.
146 fn spawn_blocking<F, T>(f: F) -> impl Future<Output = Result<T, io::Error>> + Send
147 where
148 F: FnOnce() -> T + Send + 'static,
149 T: Send + 'static,
150 {
151 let (tx, rx) = futures_channel::oneshot::channel();
152 std::thread::spawn(move || {
153 tx.send(f()).ok();
154 });
155 async { rx.await.map_err(|_| io::Error::other("spawn_blocking task cancelled")) }
156 }
157}