Skip to main content

wacore/
runtime.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4
5use async_trait::async_trait;
6
7/// A runtime-agnostic abstraction over async executor capabilities.
8///
9/// On native targets, futures must be `Send` (multi-threaded executors).
10/// On wasm32, `Send` is dropped (single-threaded).
11#[cfg(not(target_arch = "wasm32"))]
12#[async_trait]
13pub trait Runtime: Send + Sync + 'static {
14    fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) -> AbortHandle;
15    fn sleep(&self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>>;
16    fn spawn_blocking(
17        &self,
18        f: Box<dyn FnOnce() + Send + 'static>,
19    ) -> Pin<Box<dyn Future<Output = ()> + Send>>;
20
21    /// Cooperatively yield, allowing other tasks and I/O to make progress.
22    ///
23    /// Use this in tight async loops that process many items to avoid
24    /// starving other work. Returns `None` if yielding is unnecessary
25    /// (e.g. multi-threaded runtimes where other tasks run on separate
26    /// threads), or `Some(future)` that the caller must `.await` to
27    /// actually yield.
28    ///
29    /// Returning `None` avoids any allocation or async overhead, making
30    /// the call zero-cost on runtimes that don't need cooperative yielding.
31    fn yield_now(&self) -> Option<Pin<Box<dyn Future<Output = ()> + Send>>>;
32
33    /// How often to yield in tight loops (every N items). Defaults to 10.
34    /// Single-threaded runtimes should return 1 to avoid starving the event loop.
35    fn yield_frequency(&self) -> u32 {
36        10
37    }
38}
39
40/// WASM variant — `Send` bounds removed since WASM is single-threaded.
41/// Concrete types use `unsafe impl Send + Sync` since there's only one thread.
42#[cfg(target_arch = "wasm32")]
43#[async_trait(?Send)]
44pub trait Runtime: Send + Sync + 'static {
45    fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + 'static>>) -> AbortHandle;
46    fn sleep(&self, duration: Duration) -> Pin<Box<dyn Future<Output = ()>>>;
47    fn spawn_blocking(&self, f: Box<dyn FnOnce() + 'static>) -> Pin<Box<dyn Future<Output = ()>>>;
48
49    /// Cooperatively yield, allowing other tasks and I/O to make progress.
50    ///
51    /// Returns `None` if yielding is unnecessary, or `Some(future)` that
52    /// the caller must `.await` to actually yield.
53    fn yield_now(&self) -> Option<Pin<Box<dyn Future<Output = ()>>>>;
54
55    /// How often to yield in tight loops (every N items). Defaults to 10.
56    /// Single-threaded runtimes should return 1 to avoid starving the event loop.
57    fn yield_frequency(&self) -> u32 {
58        10
59    }
60}
61
62/// Handle returned by [`Runtime::spawn`]. Aborts the spawned task when dropped.
63///
64/// Uses `std::sync::Mutex` internally so that the handle is `Send + Sync`,
65/// which is required because it may be stored inside structs shared across
66/// tasks (e.g. `NoiseSocket` behind an `Arc`).
67pub struct AbortHandle {
68    abort_fn: std::sync::Mutex<Option<Box<dyn FnOnce() + Send + 'static>>>,
69}
70
71impl AbortHandle {
72    /// Create a new abort handle with the given cancellation function.
73    pub fn new(abort_fn: impl FnOnce() + Send + 'static) -> Self {
74        Self {
75            abort_fn: std::sync::Mutex::new(Some(Box::new(abort_fn))),
76        }
77    }
78
79    /// Create a no-op handle that does nothing on drop.
80    pub fn noop() -> Self {
81        Self {
82            abort_fn: std::sync::Mutex::new(None),
83        }
84    }
85
86    /// Explicitly abort the spawned task without waiting for drop.
87    pub fn abort(&self) {
88        if let Some(f) = self
89            .abort_fn
90            .lock()
91            .unwrap_or_else(|e| e.into_inner())
92            .take()
93        {
94            f();
95        }
96    }
97
98    /// Detach the handle so the task is NOT aborted on drop.
99    ///
100    /// The spawned task will run until completion even if the parent scope
101    /// is dropped. Use this for fire-and-forget tasks where cancellation
102    /// is not desired.
103    pub fn detach(self) {
104        *self.abort_fn.lock().unwrap_or_else(|e| e.into_inner()) = None;
105    }
106}
107
108impl Drop for AbortHandle {
109    fn drop(&mut self) {
110        self.abort();
111    }
112}
113
114/// Error returned when an async operation times out.
115#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
116#[error("operation timed out")]
117pub struct Elapsed;
118
119/// Race a future against a timeout. Returns [`Elapsed`] if the duration
120/// expires before the future completes.
121pub async fn timeout<F, T>(rt: &dyn Runtime, duration: Duration, future: F) -> Result<T, Elapsed>
122where
123    F: Future<Output = T>,
124{
125    use futures::future::Either;
126
127    futures::pin_mut!(future);
128    let sleep = rt.sleep(duration);
129    futures::pin_mut!(sleep);
130
131    match futures::future::select(future, sleep).await {
132        Either::Left((result, _)) => Ok(result),
133        Either::Right(((), _)) => Err(Elapsed),
134    }
135}
136
137/// Offload a blocking closure to a thread where blocking is acceptable,
138/// returning its result.
139///
140/// Convenience wrapper around [`Runtime::spawn_blocking`] that uses
141/// a oneshot channel to ferry the closure's return value back to the caller.
142///
143/// # Panics
144///
145/// Panics if the runtime drops the spawned task before it completes
146/// (e.g. during runtime shutdown).
147#[cfg(not(target_arch = "wasm32"))]
148pub async fn blocking<T: Send + 'static>(
149    rt: &dyn Runtime,
150    f: impl FnOnce() -> T + Send + 'static,
151) -> T {
152    let (tx, rx) = futures::channel::oneshot::channel();
153    rt.spawn_blocking(Box::new(move || {
154        let _ = tx.send(f());
155    }))
156    .await;
157    rx.await.unwrap_or_else(|_| {
158        panic!("blocking task failed to complete (closure panic or runtime shutdown)")
159    })
160}
161
162/// WASM variant — runs inline (single-threaded).
163#[cfg(target_arch = "wasm32")]
164pub async fn blocking<T: 'static>(_rt: &dyn Runtime, f: impl FnOnce() -> T + 'static) -> T {
165    f()
166}