openraft_rt_monoio/
lib.rs1use std::future::Future;
26use std::time::Duration;
27
28use openraft_rt::AsyncRuntime;
29use openraft_rt::OptionalSend;
30
31mod instant;
32mod mpsc;
33mod mutex;
34mod oneshot;
35mod watch;
36
37use instant::MonoioInstant;
38use mpsc::MonoioMpsc;
39use mutex::TokioMutex;
40use oneshot::MonoioOneshot;
41use watch::TokioWatch;
42
43#[cfg(target_os = "linux")]
47type InnerRuntime = monoio::FusionRuntime<
48 monoio::time::TimeDriver<monoio::IoUringDriver>,
49 monoio::time::TimeDriver<monoio::LegacyDriver>,
50>;
51
52#[cfg(not(target_os = "linux"))]
53type InnerRuntime = monoio::FusionRuntime<monoio::time::TimeDriver<monoio::LegacyDriver>>;
54
55pub struct MonoioRuntime {
57 rt: InnerRuntime,
58}
59
60impl std::fmt::Debug for MonoioRuntime {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct("MonoioRuntime").finish()
63 }
64}
65
66impl AsyncRuntime for MonoioRuntime {
67 type JoinError = std::convert::Infallible;
69 type JoinHandle<T: OptionalSend + 'static> = monoio::task::JoinHandle<Result<T, Self::JoinError>>;
70 type Sleep = monoio::time::Sleep;
71 type Instant = MonoioInstant;
72 type TimeoutError = monoio::time::error::Elapsed;
73 type Timeout<R, T: Future<Output = R> + OptionalSend> = monoio::time::Timeout<T>;
74 type ThreadLocalRng = rand::rngs::ThreadRng;
75
76 #[inline]
77 fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
78 where
79 T: Future + OptionalSend + 'static,
80 T::Output: OptionalSend + 'static,
81 {
82 monoio::spawn(async move { Ok(future.await) })
83 }
84
85 #[inline]
86 fn sleep(duration: Duration) -> Self::Sleep {
87 monoio::time::sleep(duration)
88 }
89
90 #[inline]
91 fn sleep_until(deadline: Self::Instant) -> Self::Sleep {
92 monoio::time::sleep_until(deadline.0)
93 }
94
95 #[inline]
96 fn timeout<R, F: Future<Output = R> + OptionalSend>(duration: Duration, future: F) -> Self::Timeout<R, F> {
97 monoio::time::timeout(duration, future)
98 }
99
100 #[inline]
101 fn timeout_at<R, F: Future<Output = R> + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout<R, F> {
102 monoio::time::timeout_at(deadline.0, future)
103 }
104
105 #[inline]
106 fn is_panic(join_error: &Self::JoinError) -> bool {
107 match *join_error {}
108 }
109
110 #[inline]
111 fn thread_rng() -> Self::ThreadLocalRng {
112 rand::rng()
113 }
114
115 type Mpsc = MonoioMpsc;
116 type Watch = TokioWatch;
117 type Oneshot = MonoioOneshot;
118 type Mutex<T: OptionalSend + 'static> = TokioMutex<T>;
119
120 fn new(_threads: usize) -> Self {
121 let rt = monoio::RuntimeBuilder::<monoio::FusionDriver>::new()
123 .enable_all()
124 .build()
125 .expect("Failed to create Monoio runtime");
126 MonoioRuntime { rt }
127 }
128
129 fn block_on<F, T>(&mut self, future: F) -> T
130 where
131 F: Future<Output = T>,
132 T: OptionalSend,
133 {
134 self.rt.block_on(future)
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use openraft_rt::testing::Suite;
141
142 use super::*;
143
144 #[test]
145 fn test_monoio_rt() {
146 MonoioRuntime::run(Suite::<MonoioRuntime>::test_all());
147 }
148}