Skip to main content

openraft_rt_monoio/
lib.rs

1//! This crate provides a [`MonoioRuntime`] type, which has [`AsyncRuntime`]
2//! implemented so that you can use Openraft with [Monoio](monoio).
3//!
4//! ```ignore
5//! pub struct TypeConfig {}
6//!
7//! impl openraft::RaftTypeConfig for TypeConfig {
8//!     // Other type are omitted
9//!
10//!     type AsyncRuntime = openraft_rt_monoio::MonoioRuntime;
11//! }
12//! ```
13//!
14//! # NOTE
15//!
16//! 1. For the Openraft dependency used with this crate
17//!    1. You can disable the `default` feature as you don't need the built-in Tokio runtime.
18//!    2. The `single-threaded` feature needs to be enabled or this crate won't work.
19//! 2. With the `single-threaded` feature enabled, the handle type [`Raft`](openraft::Raft) will be
20//!    no longer [`Send`] and [`Sync`].
21//! 3. Even though this crate allows you to use Monoio, it still uses some primitives from Tokio
22//!    1. `Watch`: Monoio (or `local_sync`) does not have a watch channel.
23//!    2. `Mutex`: Monoio does not provide a Mutex implementation.
24
25use std::future::Future;
26use std::time::Duration;
27
28use openraft_rt::AsyncRuntime;
29use openraft_rt::OptionalSend;
30
31mod instant;
32mod mpsc;
33mod mutex;
34mod oneshot;
35mod watch;
36
37use instant::MonoioInstant;
38use mpsc::MonoioMpsc;
39use mutex::TokioMutex;
40use oneshot::MonoioOneshot;
41use watch::TokioWatch;
42
43/// The monoio runtime type varies by platform.
44/// On Linux, FusionRuntime has two drivers (iouring + legacy fallback).
45/// On other platforms, only legacy driver is available.
46#[cfg(target_os = "linux")]
47type InnerRuntime = monoio::FusionRuntime<
48    monoio::time::TimeDriver<monoio::IoUringDriver>,
49    monoio::time::TimeDriver<monoio::LegacyDriver>,
50>;
51
52#[cfg(not(target_os = "linux"))]
53type InnerRuntime = monoio::FusionRuntime<monoio::time::TimeDriver<monoio::LegacyDriver>>;
54
55/// [`AsyncRuntime`] implementation for Monoio.
56pub struct MonoioRuntime {
57    rt: InnerRuntime,
58}
59
60impl std::fmt::Debug for MonoioRuntime {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct("MonoioRuntime").finish()
63    }
64}
65
66impl AsyncRuntime for MonoioRuntime {
67    // Joining an async task on Monoio always succeeds
68    type JoinError = std::convert::Infallible;
69    type JoinHandle<T: OptionalSend + 'static> = monoio::task::JoinHandle<Result<T, Self::JoinError>>;
70    type Sleep = monoio::time::Sleep;
71    type Instant = MonoioInstant;
72    type TimeoutError = monoio::time::error::Elapsed;
73    type Timeout<R, T: Future<Output = R> + OptionalSend> = monoio::time::Timeout<T>;
74    type ThreadLocalRng = rand::rngs::ThreadRng;
75
76    #[inline]
77    fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
78    where
79        T: Future + OptionalSend + 'static,
80        T::Output: OptionalSend + 'static,
81    {
82        monoio::spawn(async move { Ok(future.await) })
83    }
84
85    #[inline]
86    fn sleep(duration: Duration) -> Self::Sleep {
87        monoio::time::sleep(duration)
88    }
89
90    #[inline]
91    fn sleep_until(deadline: Self::Instant) -> Self::Sleep {
92        monoio::time::sleep_until(deadline.0)
93    }
94
95    #[inline]
96    fn timeout<R, F: Future<Output = R> + OptionalSend>(duration: Duration, future: F) -> Self::Timeout<R, F> {
97        monoio::time::timeout(duration, future)
98    }
99
100    #[inline]
101    fn timeout_at<R, F: Future<Output = R> + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout<R, F> {
102        monoio::time::timeout_at(deadline.0, future)
103    }
104
105    #[inline]
106    fn is_panic(join_error: &Self::JoinError) -> bool {
107        match *join_error {}
108    }
109
110    #[inline]
111    fn thread_rng() -> Self::ThreadLocalRng {
112        rand::rng()
113    }
114
115    type Mpsc = MonoioMpsc;
116    type Watch = TokioWatch;
117    type Oneshot = MonoioOneshot;
118    type Mutex<T: OptionalSend + 'static> = TokioMutex<T>;
119
120    fn new(_threads: usize) -> Self {
121        // Monoio is single-threaded, ignores threads parameter
122        let rt = monoio::RuntimeBuilder::<monoio::FusionDriver>::new()
123            .enable_all()
124            .build()
125            .expect("Failed to create Monoio runtime");
126        MonoioRuntime { rt }
127    }
128
129    fn block_on<F, T>(&mut self, future: F) -> T
130    where
131        F: Future<Output = T>,
132        T: OptionalSend,
133    {
134        self.rt.block_on(future)
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use openraft_rt::testing::Suite;
141
142    use super::*;
143
144    #[test]
145    fn test_monoio_rt() {
146        MonoioRuntime::run(Suite::<MonoioRuntime>::test_all());
147    }
148}