fusio/executor/
mod.rs

1use core::{pin::Pin, time::Duration};
2#[cfg(not(target_arch = "wasm32"))]
3pub use std::time::Instant;
4use std::{
5    error::Error,
6    future::Future,
7    ops::{Deref, DerefMut},
8    sync::Arc,
9    time::SystemTime,
10};
11
12use async_lock::{
13    Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock, RwLockReadGuard,
14    RwLockWriteGuard,
15};
16use fusio_core::{MaybeSend, MaybeSendFuture, MaybeSync};
17#[cfg(not(target_arch = "wasm32"))]
18use futures_executor::block_on;
19#[cfg(all(target_arch = "wasm32", feature = "executor-web"))]
20use js_sys::Date;
21#[cfg(target_arch = "wasm32")]
22pub use web_time::Instant;
23
24pub trait JoinHandle<R> {
25    fn join(self) -> impl Future<Output = Result<R, Box<dyn Error + Send + Sync>>> + MaybeSend;
26}
27
28pub trait Mutex<T> {
29    type Guard<'a>: DerefMut<Target = T> + MaybeSend + 'a
30    where
31        Self: 'a;
32
33    fn lock(&self) -> impl Future<Output = Self::Guard<'_>> + MaybeSend;
34}
35
36pub trait RwLock<T> {
37    type ReadGuard<'a>: Deref<Target = T> + MaybeSend + 'a
38    where
39        Self: 'a;
40
41    type WriteGuard<'a>: DerefMut<Target = T> + MaybeSend + 'a
42    where
43        Self: 'a;
44
45    fn read(&self) -> impl Future<Output = Self::ReadGuard<'_>> + MaybeSend;
46
47    fn write(&self) -> impl Future<Output = Self::WriteGuard<'_>> + MaybeSend;
48}
49
50pub trait Executor: MaybeSend + MaybeSync + 'static {
51    type JoinHandle<R>: JoinHandle<R> + MaybeSend
52    where
53        R: MaybeSend;
54
55    type Mutex<T>: Mutex<T> + MaybeSend + MaybeSync
56    where
57        T: MaybeSend + MaybeSync;
58
59    type RwLock<T>: RwLock<T> + MaybeSend + MaybeSync
60    where
61        T: MaybeSend + MaybeSync;
62
63    fn spawn<F>(&self, future: F) -> Self::JoinHandle<F::Output>
64    where
65        F: Future + MaybeSend + 'static,
66        F::Output: MaybeSend;
67
68    fn mutex<T>(value: T) -> Self::Mutex<T>
69    where
70        T: MaybeSend + MaybeSync;
71
72    fn rw_lock<T>(value: T) -> Self::RwLock<T>
73    where
74        T: MaybeSend + MaybeSync;
75}
76
77/// Minimal timer abstraction to decouple libraries from concrete runtimes.
78pub trait Timer: MaybeSend + MaybeSync + 'static {
79    /// Sleep for the given duration and yield back to the runtime.
80    fn sleep(&self, dur: Duration) -> Pin<Box<dyn MaybeSendFuture<Output = ()>>>;
81
82    /// Return a monotonic instant suitable for measuring elapsed time.
83    fn now(&self) -> Instant;
84
85    /// Return the current wall-clock time.
86    fn system_time(&self) -> SystemTime;
87}
88
89#[cfg(all(feature = "executor-web", target_arch = "wasm32"))]
90pub mod web;
91
92#[cfg(all(feature = "opfs", target_arch = "wasm32"))]
93pub mod opfs;
94
95#[cfg(feature = "executor-tokio")]
96pub mod tokio;
97
98#[cfg(feature = "monoio")]
99pub mod monoio;
100
101/// A join handle that holds a pre-computed result.
102#[derive(Debug, Clone)]
103pub struct NoopJoinHandle<R>(Option<R>);
104
105impl<R> JoinHandle<R> for NoopJoinHandle<R>
106where
107    R: MaybeSend,
108{
109    fn join(self) -> impl Future<Output = Result<R, Box<dyn Error + Send + Sync>>> + MaybeSend {
110        let mut value = self.0;
111        async move {
112            let out = value.take().expect("noop join handle already taken");
113            Ok(out)
114        }
115    }
116}
117
118/// A Mutex implementation using async_lock, available on all platforms.
119#[derive(Debug)]
120pub struct NoopMutex<T>(AsyncMutex<T>);
121
122impl<T> NoopMutex<T> {
123    pub fn new(value: T) -> Self {
124        Self(AsyncMutex::new(value))
125    }
126}
127
128impl<T> Mutex<T> for NoopMutex<T>
129where
130    T: MaybeSend + MaybeSync,
131{
132    type Guard<'a>
133        = AsyncMutexGuard<'a, T>
134    where
135        T: 'a,
136        Self: 'a;
137
138    fn lock(&self) -> impl Future<Output = Self::Guard<'_>> + MaybeSend {
139        self.0.lock()
140    }
141}
142
143/// An RwLock implementation using async_lock, available on all platforms.
144#[derive(Debug)]
145pub struct NoopRwLock<T>(AsyncRwLock<T>);
146
147impl<T> NoopRwLock<T> {
148    pub fn new(value: T) -> Self {
149        Self(AsyncRwLock::new(value))
150    }
151}
152
153impl<T> RwLock<T> for NoopRwLock<T>
154where
155    T: MaybeSend + MaybeSync,
156{
157    type ReadGuard<'a>
158        = RwLockReadGuard<'a, T>
159    where
160        T: 'a,
161        Self: 'a;
162
163    type WriteGuard<'a>
164        = RwLockWriteGuard<'a, T>
165    where
166        T: 'a,
167        Self: 'a;
168
169    fn read(&self) -> impl Future<Output = Self::ReadGuard<'_>> + MaybeSend {
170        self.0.read()
171    }
172
173    fn write(&self) -> impl Future<Output = Self::WriteGuard<'_>> + MaybeSend {
174        self.0.write()
175    }
176}
177
178impl<T> Timer for Arc<T>
179where
180    T: Timer + ?Sized,
181{
182    fn sleep(&self, dur: Duration) -> Pin<Box<dyn MaybeSendFuture<Output = ()>>> {
183        (**self).sleep(dur)
184    }
185
186    fn now(&self) -> Instant {
187        (**self).now()
188    }
189
190    fn system_time(&self) -> SystemTime {
191        (**self).system_time()
192    }
193}
194
195/// A minimal executor that runs futures synchronously (on non-WASM) or provides
196/// no-op timer functionality. Available on all platforms.
197///
198/// On non-WASM platforms, `spawn` executes futures synchronously using `block_on`.
199/// On WASM platforms, `spawn` will panic - use `WebExecutor` instead for actual
200/// task spawning.
201#[derive(Debug, Clone, Default, Copy)]
202pub struct NoopExecutor;
203
204impl Executor for NoopExecutor {
205    type JoinHandle<R>
206        = NoopJoinHandle<R>
207    where
208        R: MaybeSend;
209
210    type Mutex<T>
211        = NoopMutex<T>
212    where
213        T: MaybeSend + MaybeSync;
214
215    type RwLock<T>
216        = NoopRwLock<T>
217    where
218        T: MaybeSend + MaybeSync;
219
220    #[cfg(not(target_arch = "wasm32"))]
221    fn spawn<F>(&self, future: F) -> Self::JoinHandle<F::Output>
222    where
223        F: Future + MaybeSend + 'static,
224        F::Output: MaybeSend,
225    {
226        NoopJoinHandle(Some(block_on(future)))
227    }
228
229    #[cfg(target_arch = "wasm32")]
230    fn spawn<F>(&self, _future: F) -> Self::JoinHandle<F::Output>
231    where
232        F: Future + MaybeSend + 'static,
233        F::Output: MaybeSend,
234    {
235        panic!("NoopExecutor::spawn is not supported on WASM. Use WebExecutor instead.")
236    }
237
238    fn mutex<T>(value: T) -> Self::Mutex<T>
239    where
240        T: MaybeSend + MaybeSync,
241    {
242        NoopMutex::new(value)
243    }
244
245    fn rw_lock<T>(value: T) -> Self::RwLock<T>
246    where
247        T: MaybeSend + MaybeSync,
248    {
249        NoopRwLock::new(value)
250    }
251}
252
253impl Timer for NoopExecutor {
254    fn sleep(&self, _dur: Duration) -> Pin<Box<dyn MaybeSendFuture<Output = ()>>> {
255        Box::pin(async move {})
256    }
257
258    fn now(&self) -> Instant {
259        Instant::now()
260    }
261
262    #[cfg(all(target_arch = "wasm32", feature = "executor-web"))]
263    fn system_time(&self) -> SystemTime {
264        SystemTime::UNIX_EPOCH + Duration::from_millis(Date::now() as u64)
265    }
266
267    #[cfg(not(all(target_arch = "wasm32", feature = "executor-web")))]
268    fn system_time(&self) -> SystemTime {
269        SystemTime::now()
270    }
271}
272
273/// Backward-compatible alias for `NoopExecutor`.
274#[cfg(not(target_arch = "wasm32"))]
275#[deprecated(since = "0.5.0", note = "Use NoopExecutor instead")]
276pub type BlockingExecutor = NoopExecutor;
277
278/// Backward-compatible alias for `NoopJoinHandle`.
279#[cfg(not(target_arch = "wasm32"))]
280#[deprecated(since = "0.5.0", note = "Use NoopJoinHandle instead")]
281pub type BlockingJoinHandle<R> = NoopJoinHandle<R>;
282
283/// Backward-compatible alias for `NoopRwLock`.
284#[cfg(not(target_arch = "wasm32"))]
285#[deprecated(since = "0.5.0", note = "Use NoopRwLock instead")]
286pub type BlockingRwLock<T> = NoopRwLock<T>;
287
288/// A blocking fallback for environments without an async runtime.
289#[cfg(not(target_arch = "wasm32"))]
290#[derive(Debug, Default, Clone, Copy)]
291pub struct BlockingSleeper;
292
293#[cfg(not(target_arch = "wasm32"))]
294impl Timer for BlockingSleeper {
295    fn sleep(&self, dur: Duration) -> Pin<Box<dyn MaybeSendFuture<Output = ()>>> {
296        Box::pin(async move { std::thread::sleep(dur) })
297    }
298
299    fn now(&self) -> Instant {
300        Instant::now()
301    }
302
303    fn system_time(&self) -> SystemTime {
304        SystemTime::now()
305    }
306}
307
308/// Timer that never sleeps and always reports current time.
309#[derive(Debug, Default, Clone, Copy)]
310pub struct NoopTimer;
311
312impl Timer for NoopTimer {
313    fn sleep(&self, _dur: Duration) -> Pin<Box<dyn MaybeSendFuture<Output = ()>>> {
314        Box::pin(async move {})
315    }
316
317    fn now(&self) -> Instant {
318        Instant::now()
319    }
320
321    #[cfg(all(target_arch = "wasm32", feature = "executor-web"))]
322    fn system_time(&self) -> SystemTime {
323        SystemTime::UNIX_EPOCH + Duration::from_millis(Date::now() as u64)
324    }
325
326    #[cfg(not(all(target_arch = "wasm32", feature = "executor-web")))]
327    fn system_time(&self) -> SystemTime {
328        SystemTime::now()
329    }
330}