wacore/runtime.rs
1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4
5use async_trait::async_trait;
6
7/// A runtime-agnostic abstraction over async executor capabilities.
8///
9/// On native targets, futures must be `Send` (multi-threaded executors).
10/// On wasm32, `Send` is dropped (single-threaded).
11#[cfg(not(target_arch = "wasm32"))]
12#[async_trait]
13pub trait Runtime: Send + Sync + 'static {
14 fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) -> AbortHandle;
15 fn sleep(&self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>>;
16 fn spawn_blocking(
17 &self,
18 f: Box<dyn FnOnce() + Send + 'static>,
19 ) -> Pin<Box<dyn Future<Output = ()> + Send>>;
20
21 /// Cooperatively yield, allowing other tasks and I/O to make progress.
22 ///
23 /// Use this in tight async loops that process many items to avoid
24 /// starving other work. Returns `None` if yielding is unnecessary
25 /// (e.g. multi-threaded runtimes where other tasks run on separate
26 /// threads), or `Some(future)` that the caller must `.await` to
27 /// actually yield.
28 ///
29 /// Returning `None` avoids any allocation or async overhead, making
30 /// the call zero-cost on runtimes that don't need cooperative yielding.
31 fn yield_now(&self) -> Option<Pin<Box<dyn Future<Output = ()> + Send>>>;
32
33 /// How often to yield in tight loops (every N items). Defaults to 10.
34 /// Single-threaded runtimes should return 1 to avoid starving the event loop.
35 fn yield_frequency(&self) -> u32 {
36 10
37 }
38}
39
40/// WASM variant — `Send` bounds removed since WASM is single-threaded.
41/// Concrete types use `unsafe impl Send + Sync` since there's only one thread.
42#[cfg(target_arch = "wasm32")]
43#[async_trait(?Send)]
44pub trait Runtime: Send + Sync + 'static {
45 fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + 'static>>) -> AbortHandle;
46 fn sleep(&self, duration: Duration) -> Pin<Box<dyn Future<Output = ()>>>;
47 fn spawn_blocking(&self, f: Box<dyn FnOnce() + 'static>) -> Pin<Box<dyn Future<Output = ()>>>;
48
49 /// Cooperatively yield, allowing other tasks and I/O to make progress.
50 ///
51 /// Returns `None` if yielding is unnecessary, or `Some(future)` that
52 /// the caller must `.await` to actually yield.
53 fn yield_now(&self) -> Option<Pin<Box<dyn Future<Output = ()>>>>;
54
55 /// How often to yield in tight loops (every N items). Defaults to 10.
56 /// Single-threaded runtimes should return 1 to avoid starving the event loop.
57 fn yield_frequency(&self) -> u32 {
58 10
59 }
60}
61
62/// Handle returned by [`Runtime::spawn`]. Aborts the spawned task when dropped.
63///
64/// Uses `std::sync::Mutex` internally so that the handle is `Send + Sync`,
65/// which is required because it may be stored inside structs shared across
66/// tasks (e.g. `NoiseSocket` behind an `Arc`).
67pub struct AbortHandle {
68 abort_fn: std::sync::Mutex<Option<Box<dyn FnOnce() + Send + 'static>>>,
69}
70
71impl AbortHandle {
72 /// Create a new abort handle with the given cancellation function.
73 pub fn new(abort_fn: impl FnOnce() + Send + 'static) -> Self {
74 Self {
75 abort_fn: std::sync::Mutex::new(Some(Box::new(abort_fn))),
76 }
77 }
78
79 /// Create a no-op handle that does nothing on drop.
80 pub fn noop() -> Self {
81 Self {
82 abort_fn: std::sync::Mutex::new(None),
83 }
84 }
85
86 /// Explicitly abort the spawned task without waiting for drop.
87 pub fn abort(&self) {
88 if let Some(f) = self
89 .abort_fn
90 .lock()
91 .unwrap_or_else(|e| e.into_inner())
92 .take()
93 {
94 f();
95 }
96 }
97
98 /// Detach the handle so the task is NOT aborted on drop.
99 ///
100 /// The spawned task will run until completion even if the parent scope
101 /// is dropped. Use this for fire-and-forget tasks where cancellation
102 /// is not desired.
103 pub fn detach(self) {
104 *self.abort_fn.lock().unwrap_or_else(|e| e.into_inner()) = None;
105 }
106}
107
108impl Drop for AbortHandle {
109 fn drop(&mut self) {
110 self.abort();
111 }
112}
113
114/// Error returned when an async operation times out.
115#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
116#[error("operation timed out")]
117pub struct Elapsed;
118
119/// Race a future against a timeout. Returns [`Elapsed`] if the duration
120/// expires before the future completes.
121pub async fn timeout<F, T>(rt: &dyn Runtime, duration: Duration, future: F) -> Result<T, Elapsed>
122where
123 F: Future<Output = T>,
124{
125 use futures::future::Either;
126
127 futures::pin_mut!(future);
128 let sleep = rt.sleep(duration);
129 futures::pin_mut!(sleep);
130
131 match futures::future::select(future, sleep).await {
132 Either::Left((result, _)) => Ok(result),
133 Either::Right(((), _)) => Err(Elapsed),
134 }
135}
136
137/// Offload a blocking closure to a thread where blocking is acceptable,
138/// returning its result.
139///
140/// Convenience wrapper around [`Runtime::spawn_blocking`] that uses
141/// a oneshot channel to ferry the closure's return value back to the caller.
142///
143/// # Panics
144///
145/// Panics if the runtime drops the spawned task before it completes
146/// (e.g. during runtime shutdown).
147#[cfg(not(target_arch = "wasm32"))]
148pub async fn blocking<T: Send + 'static>(
149 rt: &dyn Runtime,
150 f: impl FnOnce() -> T + Send + 'static,
151) -> T {
152 let (tx, rx) = futures::channel::oneshot::channel();
153 rt.spawn_blocking(Box::new(move || {
154 let _ = tx.send(f());
155 }))
156 .await;
157 rx.await.unwrap_or_else(|_| {
158 panic!("blocking task failed to complete (closure panic or runtime shutdown)")
159 })
160}
161
162/// WASM variant — runs inline (single-threaded).
163#[cfg(target_arch = "wasm32")]
164pub async fn blocking<T: 'static>(_rt: &dyn Runtime, f: impl FnOnce() -> T + 'static) -> T {
165 f()
166}