Skip to main content

modrpc_executor/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3use core::{
4    future::Future,
5    pin::Pin,
6    time::Duration,
7    task::{Context, Poll},
8};
9#[cfg(feature = "std")]
10use std::rc::Rc;
11
12pub trait ModrpcExecutor {
13    type Sleep: Future<Output = ()> + 'static;
14    type Interval: Interval + 'static;
15    type Sleeper: Sleeper + 'static;
16
17    fn new() -> Self;
18    fn spawner(&mut self) -> ispawn::LocalSpawner;
19    fn run_until<R>(&mut self, future: impl Future<Output = R>) -> R;
20    fn sleep(duration: Duration) -> Self::Sleep;
21    fn interval(period: Duration) -> Self::Interval;
22    fn new_sleeper() -> Self::Sleeper;
23}
24
25pub trait Interval {
26    #[allow(async_fn_in_trait)]
27    async fn tick(&mut self);
28}
29
30/// A dyn-compatible interface for polling one sleep `Future` at a time.
31pub trait Sleeper {
32    #[allow(async_fn_in_trait)]
33    fn snooze(self: Pin<&mut Self>, duration: Duration) -> bool;
34
35    #[allow(async_fn_in_trait)]
36    fn poll_sleep(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
37}
38
39#[cfg(feature = "async-timer")]
40impl Interval for async_timer::Interval {
41    async fn tick(&mut self) {
42        self.await;
43    }
44}
45
46#[cfg(feature = "futures-executor")]
47pub struct FuturesExecutor {
48    local_pool: futures_executor::LocalPool,
49}
50
51#[cfg(feature = "futures-executor")]
52impl ModrpcExecutor for FuturesExecutor {
53    type Sleep = async_timer::timer::Platform;
54    type Interval = async_timer::Interval;
55    type Sleeper = AsyncTimerSleeper;
56
57    fn new() -> Self {
58        let local_pool = futures_executor::LocalPool::new();
59        Self { local_pool }
60    }
61
62    fn spawner(&mut self) -> ispawn::LocalSpawner {
63        ispawn::LocalSpawner::new(Rc::new(self.local_pool.spawner()))
64    }
65
66    fn run_until<R>(&mut self, future: impl Future<Output = R>) -> R {
67        self.local_pool.run_until(future)
68    }
69
70    fn sleep(duration: Duration) -> Self::Sleep {
71        async_timer::timer::Platform::new(duration)
72    }
73
74    fn interval(period: Duration) -> Self::Interval {
75        async_timer::Interval::new(period)
76    }
77
78    fn new_sleeper() -> Self::Sleeper {
79        AsyncTimerSleeper { current_sleep: None }
80    }
81}
82
83#[cfg(feature = "tokio")]
84pub struct TokioExecutor {
85    rt: tokio::runtime::Runtime,
86    local_set: Rc<tokio::task::LocalSet>,
87}
88
89#[cfg(feature = "tokio")]
90impl TokioExecutor {
91    pub fn tokio_runtime(&self) -> &tokio::runtime::Runtime {
92        &self.rt
93    }
94
95    pub fn local_set(&self) -> &tokio::task::LocalSet {
96        &self.local_set
97    }
98}
99
100#[cfg(feature = "tokio")]
101impl ModrpcExecutor for TokioExecutor {
102    type Sleep = tokio::time::Sleep;
103    type Interval = tokio::time::Interval;
104    type Sleeper = TokioSleeper;
105
106    fn new() -> Self {
107        let rt = tokio::runtime::Builder::new_current_thread()
108            .enable_all()
109            .build()
110            .expect("new modrpc tokio runtime");
111        let local_set = Rc::new(tokio::task::LocalSet::new());
112
113        Self { rt, local_set }
114    }
115
116    fn spawner(&mut self) -> ispawn::LocalSpawner {
117        ispawn::LocalSpawner::new(self.local_set.clone())
118    }
119
120    fn run_until<R>(&mut self, future: impl Future<Output = R>) -> R {
121        self.local_set.block_on(&self.rt, future)
122    }
123
124    fn sleep(duration: Duration) -> Self::Sleep {
125        tokio::time::sleep(duration)
126    }
127
128    fn interval(period: Duration) -> Self::Interval {
129        tokio::time::interval(period)
130    }
131
132    fn new_sleeper() -> Self::Sleeper {
133        TokioSleeper { current_sleep: None }
134    }
135}
136
137#[cfg(feature = "tokio")]
138impl Interval for tokio::time::Interval {
139    async fn tick(&mut self) {
140        let _ = self.tick().await;
141    }
142}
143
144#[cfg(feature = "tokio")]
145pub struct TokioSleeper {
146    current_sleep: Option<tokio::time::Sleep>,
147}
148
149#[cfg(feature = "tokio")]
150impl TokioSleeper {
151    fn current_sleep(self: Pin<&mut Self>) -> Option<Pin<&mut tokio::time::Sleep>> {
152        unsafe { self.map_unchecked_mut(|s| &mut s.current_sleep) }.as_pin_mut()
153    }
154}
155
156#[cfg(feature = "tokio")]
157impl Sleeper for TokioSleeper {
158    fn snooze(self: Pin<&mut Self>, duration: Duration) -> bool {
159        // SAFETY: We don't overwrite an existing `Sleep` object.
160        let current_sleep = &mut unsafe { self.get_unchecked_mut() }.current_sleep;
161        if current_sleep.is_none() {
162            *current_sleep = Some(tokio::time::sleep(duration));
163            true
164        } else {
165            false
166        }
167    }
168
169    fn poll_sleep(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
170        if let Some(sleep) = self.as_mut().current_sleep() {
171            match sleep.poll(cx) {
172                Poll::Ready(()) => {
173                    // SAFETY: There are no dangling references to the inner `Sleep` object after
174                    // dropping it.
175                    unsafe { self.get_unchecked_mut() }.current_sleep = None;
176                    Poll::Ready(())
177                }
178                Poll::Pending => Poll::Pending,
179            }
180        } else {
181            Poll::Ready(())
182        }
183    }
184}
185
186#[cfg(feature = "dioxus")]
187pub use ispawn::DioxusSpawner;
188
189#[cfg(feature = "dioxus")]
190pub struct DioxusExecutor;
191
192#[cfg(feature = "dioxus")]
193impl ModrpcExecutor for DioxusExecutor {
194    type Sleep = async_timer::timer::Platform;
195    type Interval = async_timer::Interval;
196    type Sleeper = AsyncTimerSleeper;
197
198    fn new() -> Self { Self }
199
200    fn spawner(&mut self) -> ispawn::LocalSpawner {
201        ispawn::LocalSpawner::new(ispawn::DioxusSpawner)
202    }
203
204    fn run_until<R>(&mut self, _: impl Future<Output = R>) -> R {
205        panic!("DioxusExecutor is only supported in single-threaded modrpc runtimes");
206    }
207
208    fn sleep(duration: Duration) -> Self::Sleep {
209        async_timer::timer::Platform::new(duration)
210    }
211
212    fn interval(period: Duration) -> Self::Interval {
213        async_timer::Interval::new(period)
214    }
215
216    fn new_sleeper() -> Self::Sleeper {
217        AsyncTimerSleeper { current_sleep: None }
218    }
219}
220
221#[cfg(feature = "wasm-bindgen")]
222pub use ispawn::WasmBindgenSpawner;
223
224#[cfg(feature = "wasm-bindgen")]
225pub struct WasmBindgenExecutor;
226
227#[cfg(feature = "wasm-bindgen")]
228impl ModrpcExecutor for WasmBindgenExecutor {
229    type Sleep = async_timer::timer::Platform;
230    type Interval = async_timer::Interval;
231    type Sleeper = AsyncTimerSleeper;
232
233    fn new() -> Self { Self }
234
235    fn spawner(&mut self) -> ispawn::LocalSpawner {
236        ispawn::LocalSpawner::new(ispawn::WasmBindgenSpawner)
237    }
238
239    fn run_until<R>(&mut self, _: impl Future<Output = R>) -> R {
240        panic!("WasmBindgenExecutor is only supported in single-threaded modrpc runtimes");
241    }
242
243    fn sleep(duration: Duration) -> Self::Sleep {
244        async_timer::timer::Platform::new(duration)
245    }
246
247    fn interval(period: Duration) -> Self::Interval {
248        async_timer::Interval::new(period)
249    }
250
251    fn new_sleeper() -> Self::Sleeper {
252        AsyncTimerSleeper { current_sleep: None }
253    }
254}
255
256#[cfg(any(
257    feature = "dioxus",
258    feature = "futures-executor",
259    feature = "wasm-bindgen",
260))]
261pub struct AsyncTimerSleeper {
262    current_sleep: Option<async_timer::timer::Platform>,
263}
264
265#[cfg(any(
266    feature = "dioxus",
267    feature = "futures-executor",
268    feature = "wasm-bindgen",
269))]
270impl AsyncTimerSleeper {
271    fn current_sleep(self: Pin<&mut Self>) -> Option<Pin<&mut async_timer::timer::Platform>> {
272        unsafe { self.map_unchecked_mut(|s| &mut s.current_sleep) }.as_pin_mut()
273    }
274}
275
276#[cfg(any(
277    feature = "dioxus",
278    feature = "futures-executor",
279    feature = "wasm-bindgen",
280))]
281impl Sleeper for AsyncTimerSleeper {
282    fn snooze(self: Pin<&mut Self>, duration: Duration) -> bool {
283        // SAFETY: We don't overwrite an existing `Sleep` object.
284        let current_sleep = &mut unsafe { self.get_unchecked_mut() }.current_sleep;
285        if current_sleep.is_none() {
286            *current_sleep = Some(async_timer::timer::Platform::new(duration));
287            true
288        } else {
289            false
290        }
291    }
292
293    fn poll_sleep(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
294        if let Some(sleep) = self.as_mut().current_sleep() {
295            match sleep.poll(cx) {
296                Poll::Ready(()) => {
297                    // SAFETY: There are no dangling references to the inner `Sleep` object after
298                    // dropping it.
299                    unsafe { self.get_unchecked_mut() }.current_sleep = None;
300                    Poll::Ready(())
301                }
302                Poll::Pending => Poll::Pending,
303            }
304        } else {
305            Poll::Ready(())
306        }
307    }
308}