1#![cfg_attr(not(feature = "std"), no_std)]
2
3use core::{
4 future::Future,
5 pin::Pin,
6 time::Duration,
7 task::{Context, Poll},
8};
9#[cfg(feature = "std")]
10use std::rc::Rc;
11
12pub trait ModrpcExecutor {
13 type Sleep: Future<Output = ()> + 'static;
14 type Interval: Interval + 'static;
15 type Sleeper: Sleeper + 'static;
16
17 fn new() -> Self;
18 fn spawner(&mut self) -> ispawn::LocalSpawner;
19 fn run_until<R>(&mut self, future: impl Future<Output = R>) -> R;
20 fn sleep(duration: Duration) -> Self::Sleep;
21 fn interval(period: Duration) -> Self::Interval;
22 fn new_sleeper() -> Self::Sleeper;
23}
24
25pub trait Interval {
26 #[allow(async_fn_in_trait)]
27 async fn tick(&mut self);
28}
29
30pub trait Sleeper {
32 #[allow(async_fn_in_trait)]
33 fn snooze(self: Pin<&mut Self>, duration: Duration) -> bool;
34
35 #[allow(async_fn_in_trait)]
36 fn poll_sleep(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
37}
38
39#[cfg(feature = "async-timer")]
40impl Interval for async_timer::Interval {
41 async fn tick(&mut self) {
42 self.await;
43 }
44}
45
46#[cfg(feature = "futures-executor")]
47pub struct FuturesExecutor {
48 local_pool: futures_executor::LocalPool,
49}
50
51#[cfg(feature = "futures-executor")]
52impl ModrpcExecutor for FuturesExecutor {
53 type Sleep = async_timer::timer::Platform;
54 type Interval = async_timer::Interval;
55 type Sleeper = AsyncTimerSleeper;
56
57 fn new() -> Self {
58 let local_pool = futures_executor::LocalPool::new();
59 Self { local_pool }
60 }
61
62 fn spawner(&mut self) -> ispawn::LocalSpawner {
63 ispawn::LocalSpawner::new(Rc::new(self.local_pool.spawner()))
64 }
65
66 fn run_until<R>(&mut self, future: impl Future<Output = R>) -> R {
67 self.local_pool.run_until(future)
68 }
69
70 fn sleep(duration: Duration) -> Self::Sleep {
71 async_timer::timer::Platform::new(duration)
72 }
73
74 fn interval(period: Duration) -> Self::Interval {
75 async_timer::Interval::new(period)
76 }
77
78 fn new_sleeper() -> Self::Sleeper {
79 AsyncTimerSleeper { current_sleep: None }
80 }
81}
82
83#[cfg(feature = "tokio")]
84pub struct TokioExecutor {
85 rt: tokio::runtime::Runtime,
86 local_set: Rc<tokio::task::LocalSet>,
87}
88
89#[cfg(feature = "tokio")]
90impl TokioExecutor {
91 pub fn tokio_runtime(&self) -> &tokio::runtime::Runtime {
92 &self.rt
93 }
94
95 pub fn local_set(&self) -> &tokio::task::LocalSet {
96 &self.local_set
97 }
98}
99
100#[cfg(feature = "tokio")]
101impl ModrpcExecutor for TokioExecutor {
102 type Sleep = tokio::time::Sleep;
103 type Interval = tokio::time::Interval;
104 type Sleeper = TokioSleeper;
105
106 fn new() -> Self {
107 let rt = tokio::runtime::Builder::new_current_thread()
108 .enable_all()
109 .build()
110 .expect("new modrpc tokio runtime");
111 let local_set = Rc::new(tokio::task::LocalSet::new());
112
113 Self { rt, local_set }
114 }
115
116 fn spawner(&mut self) -> ispawn::LocalSpawner {
117 ispawn::LocalSpawner::new(self.local_set.clone())
118 }
119
120 fn run_until<R>(&mut self, future: impl Future<Output = R>) -> R {
121 self.local_set.block_on(&self.rt, future)
122 }
123
124 fn sleep(duration: Duration) -> Self::Sleep {
125 tokio::time::sleep(duration)
126 }
127
128 fn interval(period: Duration) -> Self::Interval {
129 tokio::time::interval(period)
130 }
131
132 fn new_sleeper() -> Self::Sleeper {
133 TokioSleeper { current_sleep: None }
134 }
135}
136
137#[cfg(feature = "tokio")]
138impl Interval for tokio::time::Interval {
139 async fn tick(&mut self) {
140 let _ = self.tick().await;
141 }
142}
143
144#[cfg(feature = "tokio")]
145pub struct TokioSleeper {
146 current_sleep: Option<tokio::time::Sleep>,
147}
148
149#[cfg(feature = "tokio")]
150impl TokioSleeper {
151 fn current_sleep(self: Pin<&mut Self>) -> Option<Pin<&mut tokio::time::Sleep>> {
152 unsafe { self.map_unchecked_mut(|s| &mut s.current_sleep) }.as_pin_mut()
153 }
154}
155
156#[cfg(feature = "tokio")]
157impl Sleeper for TokioSleeper {
158 fn snooze(self: Pin<&mut Self>, duration: Duration) -> bool {
159 let current_sleep = &mut unsafe { self.get_unchecked_mut() }.current_sleep;
161 if current_sleep.is_none() {
162 *current_sleep = Some(tokio::time::sleep(duration));
163 true
164 } else {
165 false
166 }
167 }
168
169 fn poll_sleep(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
170 if let Some(sleep) = self.as_mut().current_sleep() {
171 match sleep.poll(cx) {
172 Poll::Ready(()) => {
173 unsafe { self.get_unchecked_mut() }.current_sleep = None;
176 Poll::Ready(())
177 }
178 Poll::Pending => Poll::Pending,
179 }
180 } else {
181 Poll::Ready(())
182 }
183 }
184}
185
186#[cfg(feature = "dioxus")]
187pub use ispawn::DioxusSpawner;
188
189#[cfg(feature = "dioxus")]
190pub struct DioxusExecutor;
191
192#[cfg(feature = "dioxus")]
193impl ModrpcExecutor for DioxusExecutor {
194 type Sleep = async_timer::timer::Platform;
195 type Interval = async_timer::Interval;
196 type Sleeper = AsyncTimerSleeper;
197
198 fn new() -> Self { Self }
199
200 fn spawner(&mut self) -> ispawn::LocalSpawner {
201 ispawn::LocalSpawner::new(ispawn::DioxusSpawner)
202 }
203
204 fn run_until<R>(&mut self, _: impl Future<Output = R>) -> R {
205 panic!("DioxusExecutor is only supported in single-threaded modrpc runtimes");
206 }
207
208 fn sleep(duration: Duration) -> Self::Sleep {
209 async_timer::timer::Platform::new(duration)
210 }
211
212 fn interval(period: Duration) -> Self::Interval {
213 async_timer::Interval::new(period)
214 }
215
216 fn new_sleeper() -> Self::Sleeper {
217 AsyncTimerSleeper { current_sleep: None }
218 }
219}
220
221#[cfg(feature = "wasm-bindgen")]
222pub use ispawn::WasmBindgenSpawner;
223
224#[cfg(feature = "wasm-bindgen")]
225pub struct WasmBindgenExecutor;
226
227#[cfg(feature = "wasm-bindgen")]
228impl ModrpcExecutor for WasmBindgenExecutor {
229 type Sleep = async_timer::timer::Platform;
230 type Interval = async_timer::Interval;
231 type Sleeper = AsyncTimerSleeper;
232
233 fn new() -> Self { Self }
234
235 fn spawner(&mut self) -> ispawn::LocalSpawner {
236 ispawn::LocalSpawner::new(ispawn::WasmBindgenSpawner)
237 }
238
239 fn run_until<R>(&mut self, _: impl Future<Output = R>) -> R {
240 panic!("WasmBindgenExecutor is only supported in single-threaded modrpc runtimes");
241 }
242
243 fn sleep(duration: Duration) -> Self::Sleep {
244 async_timer::timer::Platform::new(duration)
245 }
246
247 fn interval(period: Duration) -> Self::Interval {
248 async_timer::Interval::new(period)
249 }
250
251 fn new_sleeper() -> Self::Sleeper {
252 AsyncTimerSleeper { current_sleep: None }
253 }
254}
255
256#[cfg(any(
257 feature = "dioxus",
258 feature = "futures-executor",
259 feature = "wasm-bindgen",
260))]
261pub struct AsyncTimerSleeper {
262 current_sleep: Option<async_timer::timer::Platform>,
263}
264
265#[cfg(any(
266 feature = "dioxus",
267 feature = "futures-executor",
268 feature = "wasm-bindgen",
269))]
270impl AsyncTimerSleeper {
271 fn current_sleep(self: Pin<&mut Self>) -> Option<Pin<&mut async_timer::timer::Platform>> {
272 unsafe { self.map_unchecked_mut(|s| &mut s.current_sleep) }.as_pin_mut()
273 }
274}
275
276#[cfg(any(
277 feature = "dioxus",
278 feature = "futures-executor",
279 feature = "wasm-bindgen",
280))]
281impl Sleeper for AsyncTimerSleeper {
282 fn snooze(self: Pin<&mut Self>, duration: Duration) -> bool {
283 let current_sleep = &mut unsafe { self.get_unchecked_mut() }.current_sleep;
285 if current_sleep.is_none() {
286 *current_sleep = Some(async_timer::timer::Platform::new(duration));
287 true
288 } else {
289 false
290 }
291 }
292
293 fn poll_sleep(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
294 if let Some(sleep) = self.as_mut().current_sleep() {
295 match sleep.poll(cx) {
296 Poll::Ready(()) => {
297 unsafe { self.get_unchecked_mut() }.current_sleep = None;
300 Poll::Ready(())
301 }
302 Poll::Pending => Poll::Pending,
303 }
304 } else {
305 Poll::Ready(())
306 }
307 }
308}