any_spawner/lib.rs
1//! This crate makes it easier to write asynchronous code that is executor-agnostic, by providing a
2//! utility that can be used to spawn tasks in a variety of executors.
3//!
4//! It only supports single executor per program, but that executor can be set at runtime, anywhere
5//! in your crate (or an application that depends on it).
6//!
7//! This can be extended to support any executor or runtime that supports spawning [`Future`]s.
8//!
9//! This is a least common denominator implementation in many ways. Limitations include:
10//! - setting an executor is a one-time, global action
11//! - no "join handle" or other result is returned from the spawn
12//! - the `Future` must output `()`
13//!
14//! ```no_run
15//! use any_spawner::Executor;
16//!
17//! // make sure an Executor has been initialized with one of the init_ functions
18//!
19//! // spawn a thread-safe Future
20//! Executor::spawn(async { /* ... */ });
21//!
22//! // spawn a Future that is !Send
23//! Executor::spawn_local(async { /* ... */ });
24//! ```
25
26#![forbid(unsafe_code)]
27#![deny(missing_docs)]
28#![cfg_attr(docsrs, feature(doc_cfg))]
29
30use std::{future::Future, pin::Pin, sync::OnceLock};
31use thiserror::Error;
32
33/// A future that has been pinned.
34pub type PinnedFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
35/// A future that has been pinned.
36pub type PinnedLocalFuture<T> = Pin<Box<dyn Future<Output = T>>>;
37
38// Type alias for the spawn function pointer.
39type SpawnFn = fn(PinnedFuture<()>);
40// Type alias for the spawn_local function pointer.
41type SpawnLocalFn = fn(PinnedLocalFuture<()>);
42// Type alias for the poll_local function pointer.
43type PollLocalFn = fn();
44
45/// Holds the function pointers for the current global executor.
46#[derive(Clone, Copy)]
47struct ExecutorFns {
48 spawn: SpawnFn,
49 spawn_local: SpawnLocalFn,
50 poll_local: PollLocalFn,
51}
52
53// Use a single OnceLock to ensure atomic initialization of all functions.
54static EXECUTOR_FNS: OnceLock<ExecutorFns> = OnceLock::new();
55
56// No-op functions to use when an executor doesn't support a specific operation.
57#[cfg(any(feature = "tokio", feature = "wasm-bindgen", feature = "glib"))]
58#[cold]
59#[inline(never)]
60fn no_op_poll() {}
61
62#[cfg(all(not(feature = "wasm-bindgen"), not(debug_assertions)))]
63#[cold]
64#[inline(never)]
65fn no_op_spawn(_: PinnedFuture<()>) {
66 #[cfg(debug_assertions)]
67 eprintln!(
68 "Warning: Executor::spawn called, but no global 'spawn' function is \
69 configured (perhaps only spawn_local is supported, e.g., on wasm \
70 without threading?)."
71 );
72}
73
74// Wasm panics if you spawn without an executor
75#[cfg(feature = "wasm-bindgen")]
76#[cold]
77#[inline(never)]
78fn no_op_spawn(_: PinnedFuture<()>) {
79 panic!(
80 "Executor::spawn called, but no global 'spawn' function is configured."
81 );
82}
83
84#[cfg(not(debug_assertions))]
85#[cold]
86#[inline(never)]
87fn no_op_spawn_local(_: PinnedLocalFuture<()>) {
88 panic!(
89 "Executor::spawn_local called, but no global 'spawn_local' function \
90 is configured."
91 );
92}
93
94/// Errors that can occur when using the executor.
95#[derive(Error, Debug)]
96pub enum ExecutorError {
97 /// The executor has already been set.
98 #[error("Global executor has already been set.")]
99 AlreadySet,
100}
101
102/// A global async executor that can spawn tasks.
103pub struct Executor;
104
105impl Executor {
106 /// Spawns a thread-safe [`Future`].
107 ///
108 /// Uses the globally configured executor.
109 /// Panics if no global executor has been initialized.
110 #[inline(always)]
111 #[track_caller]
112 pub fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
113 let pinned_fut = Box::pin(fut);
114
115 if let Some(fns) = EXECUTOR_FNS.get() {
116 (fns.spawn)(pinned_fut)
117 } else {
118 // No global executor set.
119 handle_uninitialized_spawn(pinned_fut);
120 }
121 }
122
123 /// Spawns a [`Future`] that cannot be sent across threads.
124 ///
125 /// Uses the globally configured executor.
126 /// Panics if no global executor has been initialized.
127 #[inline(always)]
128 #[track_caller]
129 pub fn spawn_local(fut: impl Future<Output = ()> + 'static) {
130 let pinned_fut = Box::pin(fut);
131
132 if let Some(fns) = EXECUTOR_FNS.get() {
133 (fns.spawn_local)(pinned_fut)
134 } else {
135 // No global executor set.
136 handle_uninitialized_spawn_local(pinned_fut);
137 }
138 }
139
140 /// Waits until the next "tick" of the current async executor.
141 /// Respects the global executor.
142 #[inline(always)]
143 pub async fn tick() {
144 let (tx, rx) = futures::channel::oneshot::channel();
145 #[cfg(not(all(feature = "wasm-bindgen", target_family = "wasm")))]
146 Executor::spawn(async move {
147 _ = tx.send(());
148 });
149 #[cfg(all(feature = "wasm-bindgen", target_family = "wasm"))]
150 Executor::spawn_local(async move {
151 _ = tx.send(());
152 });
153
154 _ = rx.await;
155 }
156
157 /// Polls the global async executor.
158 ///
159 /// Uses the globally configured executor.
160 /// Does nothing if the global executor does not support polling.
161 #[inline(always)]
162 pub fn poll_local() {
163 if let Some(fns) = EXECUTOR_FNS.get() {
164 (fns.poll_local)()
165 }
166 // If not initialized or doesn't support polling, do nothing gracefully.
167 }
168}
169
170impl Executor {
171 /// Globally sets the [`tokio`] runtime as the executor used to spawn tasks.
172 ///
173 /// Returns `Err(_)` if a global executor has already been set.
174 ///
175 /// Requires the `tokio` feature to be activated on this crate.
176 #[cfg(feature = "tokio")]
177 #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
178 pub fn init_tokio() -> Result<(), ExecutorError> {
179 let executor_impl = ExecutorFns {
180 spawn: |fut| {
181 tokio::spawn(fut);
182 },
183 spawn_local: |fut| {
184 tokio::task::spawn_local(fut);
185 },
186 // Tokio doesn't have an explicit global poll function like LocalPool::run_until_stalled
187 poll_local: no_op_poll,
188 };
189 EXECUTOR_FNS
190 .set(executor_impl)
191 .map_err(|_| ExecutorError::AlreadySet)
192 }
193
194 /// Globally sets the [`wasm-bindgen-futures`] runtime as the executor used to spawn tasks.
195 ///
196 /// Returns `Err(_)` if a global executor has already been set.
197 ///
198 /// Requires the `wasm-bindgen` feature to be activated on this crate.
199 #[cfg(feature = "wasm-bindgen")]
200 #[cfg_attr(docsrs, doc(cfg(feature = "wasm-bindgen")))]
201 pub fn init_wasm_bindgen() -> Result<(), ExecutorError> {
202 let executor_impl = ExecutorFns {
203 // wasm-bindgen-futures only supports spawn_local
204 spawn: no_op_spawn,
205 spawn_local: |fut| {
206 wasm_bindgen_futures::spawn_local(fut);
207 },
208 poll_local: no_op_poll,
209 };
210 EXECUTOR_FNS
211 .set(executor_impl)
212 .map_err(|_| ExecutorError::AlreadySet)
213 }
214
215 /// Globally sets the [`glib`] runtime as the executor used to spawn tasks.
216 ///
217 /// Returns `Err(_)` if a global executor has already been set.
218 ///
219 /// Requires the `glib` feature to be activated on this crate.
220 #[cfg(feature = "glib")]
221 #[cfg_attr(docsrs, doc(cfg(feature = "glib")))]
222 pub fn init_glib() -> Result<(), ExecutorError> {
223 let executor_impl = ExecutorFns {
224 spawn: |fut| {
225 let main_context = glib::MainContext::default();
226 main_context.spawn(fut);
227 },
228 spawn_local: |fut| {
229 let main_context = glib::MainContext::default();
230 main_context.spawn_local(fut);
231 },
232 // Glib needs event loop integration, explicit polling isn't the standard model here.
233 poll_local: no_op_poll,
234 };
235 EXECUTOR_FNS
236 .set(executor_impl)
237 .map_err(|_| ExecutorError::AlreadySet)
238 }
239
240 /// Globally sets the [`futures`] executor as the executor used to spawn tasks,
241 /// lazily creating a thread pool to spawn tasks into.
242 ///
243 /// Returns `Err(_)` if a global executor has already been set.
244 ///
245 /// Requires the `futures-executor` feature to be activated on this crate.
246 #[cfg(feature = "futures-executor")]
247 #[cfg_attr(docsrs, doc(cfg(feature = "futures-executor")))]
248 pub fn init_futures_executor() -> Result<(), ExecutorError> {
249 use futures::{
250 executor::{LocalPool, LocalSpawner, ThreadPool},
251 task::{LocalSpawnExt, SpawnExt},
252 };
253 use std::cell::RefCell;
254
255 // Keep the lazy-init ThreadPool and thread-local LocalPool for spawn_local impl
256 static THREAD_POOL: OnceLock<ThreadPool> = OnceLock::new();
257 thread_local! {
258 static LOCAL_POOL: RefCell<LocalPool> = RefCell::new(LocalPool::new());
259 // SPAWNER is derived from LOCAL_POOL, keep it for efficiency inside the closure
260 static SPAWNER: LocalSpawner = LOCAL_POOL.with(|pool| pool.borrow().spawner());
261 }
262
263 fn get_thread_pool() -> &'static ThreadPool {
264 THREAD_POOL.get_or_init(|| {
265 ThreadPool::new()
266 .expect("could not create futures executor ThreadPool")
267 })
268 }
269
270 let executor_impl = ExecutorFns {
271 spawn: |fut| {
272 get_thread_pool()
273 .spawn(fut)
274 .expect("failed to spawn future on ThreadPool");
275 },
276 spawn_local: |fut| {
277 // Use the thread_local SPAWNER derived from LOCAL_POOL
278 SPAWNER.with(|spawner| {
279 spawner
280 .spawn_local(fut)
281 .expect("failed to spawn local future");
282 });
283 },
284 poll_local: || {
285 // Use the thread_local LOCAL_POOL
286 LOCAL_POOL.with(|pool| {
287 // Use try_borrow_mut to prevent panic during re-entrant calls
288 if let Ok(mut pool) = pool.try_borrow_mut() {
289 pool.run_until_stalled();
290 }
291 // If already borrowed, we're likely in a nested poll, so do nothing.
292 });
293 },
294 };
295
296 EXECUTOR_FNS
297 .set(executor_impl)
298 .map_err(|_| ExecutorError::AlreadySet)
299 }
300
301 /// Globally sets the [`async_executor`] executor as the executor used to spawn tasks,
302 /// lazily creating a thread pool to spawn tasks into.
303 ///
304 /// Returns `Err(_)` if a global executor has already been set.
305 ///
306 /// Requires the `async-executor` feature to be activated on this crate.
307 #[cfg(feature = "async-executor")]
308 #[cfg_attr(docsrs, doc(cfg(feature = "async-executor")))]
309 pub fn init_async_executor() -> Result<(), ExecutorError> {
310 use async_executor::{Executor as AsyncExecutor, LocalExecutor};
311
312 // Keep the lazy-init global Executor and thread-local LocalExecutor for spawn_local impl
313 static ASYNC_EXECUTOR: OnceLock<AsyncExecutor<'static>> =
314 OnceLock::new();
315 thread_local! {
316 static LOCAL_EXECUTOR_POOL: LocalExecutor<'static> = const { LocalExecutor::new() };
317 }
318
319 fn get_async_executor() -> &'static AsyncExecutor<'static> {
320 ASYNC_EXECUTOR.get_or_init(AsyncExecutor::new)
321 }
322
323 let executor_impl = ExecutorFns {
324 spawn: |fut| {
325 get_async_executor().spawn(fut).detach();
326 },
327 spawn_local: |fut| {
328 LOCAL_EXECUTOR_POOL.with(|pool| pool.spawn(fut).detach());
329 },
330 poll_local: || {
331 LOCAL_EXECUTOR_POOL.with(|pool| {
332 // try_tick polls the local executor without blocking
333 // This prevents issues if called recursively or from within a task.
334 pool.try_tick();
335 });
336 },
337 };
338 EXECUTOR_FNS
339 .set(executor_impl)
340 .map_err(|_| ExecutorError::AlreadySet)
341 }
342
343 /// Globally sets a custom executor as the executor used to spawn tasks.
344 ///
345 /// Requires the custom executor to be `Send + Sync` as it will be stored statically.
346 ///
347 /// Returns `Err(_)` if a global executor has already been set.
348 pub fn init_custom_executor(
349 custom_executor: impl CustomExecutor + Send + Sync + 'static,
350 ) -> Result<(), ExecutorError> {
351 // Store the custom executor instance itself to call its methods.
352 // Use Box for dynamic dispatch.
353 static CUSTOM_EXECUTOR_INSTANCE: OnceLock<
354 Box<dyn CustomExecutor + Send + Sync>,
355 > = OnceLock::new();
356
357 CUSTOM_EXECUTOR_INSTANCE
358 .set(Box::new(custom_executor))
359 .map_err(|_| ExecutorError::AlreadySet)?;
360
361 // Now set the ExecutorFns using the stored instance
362 let executor_impl = ExecutorFns {
363 spawn: |fut| {
364 // Unwrap is safe because we just set it successfully or returned Err.
365 CUSTOM_EXECUTOR_INSTANCE.get().unwrap().spawn(fut);
366 },
367 spawn_local: |fut| {
368 CUSTOM_EXECUTOR_INSTANCE.get().unwrap().spawn_local(fut);
369 },
370 poll_local: || {
371 CUSTOM_EXECUTOR_INSTANCE.get().unwrap().poll_local();
372 },
373 };
374
375 EXECUTOR_FNS
376 .set(executor_impl)
377 .map_err(|_| ExecutorError::AlreadySet)
378 // If setting EXECUTOR_FNS fails (extremely unlikely race if called *concurrently*
379 // with another init_* after CUSTOM_EXECUTOR_INSTANCE was set), we technically
380 // leave CUSTOM_EXECUTOR_INSTANCE set but EXECUTOR_FNS not. This is an edge case,
381 // but the primary race condition is solved.
382 }
383
384 /// Sets a custom executor *for the current thread only*.
385 ///
386 /// This overrides the global executor for calls to `spawn`, `spawn_local`, and `poll_local`
387 /// made *from the current thread*. It does not affect other threads or the global state.
388 ///
389 /// The provided `custom_executor` must implement [`CustomExecutor`] and `'static`, but does
390 /// **not** need to be `Send` or `Sync`.
391 ///
392 /// Returns `Err(ExecutorError::AlreadySet)` if a *local* executor has already been set
393 /// *for this thread*.
394 pub fn init_local_custom_executor(
395 custom_executor: impl CustomExecutor + 'static,
396 ) -> Result<(), ExecutorError> {
397 // Store the custom executor instance itself to call its methods.
398 // Use Box for dynamic dispatch.
399 thread_local! {
400 static CUSTOM_EXECUTOR_INSTANCE: OnceLock<
401 Box<dyn CustomExecutor>,
402 > = OnceLock::new();
403 };
404
405 CUSTOM_EXECUTOR_INSTANCE.with(|this| {
406 this.set(Box::new(custom_executor))
407 .map_err(|_| ExecutorError::AlreadySet)
408 })?;
409
410 // Now set the ExecutorFns using the stored instance
411 let executor_impl = ExecutorFns {
412 spawn: |fut| {
413 // Unwrap is safe because we just set it successfully or returned Err.
414 CUSTOM_EXECUTOR_INSTANCE
415 .with(|this| this.get().unwrap().spawn(fut));
416 },
417 spawn_local: |fut| {
418 CUSTOM_EXECUTOR_INSTANCE
419 .with(|this| this.get().unwrap().spawn_local(fut));
420 },
421 poll_local: || {
422 CUSTOM_EXECUTOR_INSTANCE
423 .with(|this| this.get().unwrap().poll_local());
424 },
425 };
426
427 EXECUTOR_FNS
428 .set(executor_impl)
429 .map_err(|_| ExecutorError::AlreadySet)
430 }
431}
432
433/// A trait for custom executors.
434/// Custom executors can be used to integrate with any executor that supports spawning futures.
435///
436/// If used with `init_custom_executor`, the implementation must be `Send + Sync + 'static`.
437///
438/// All methods can be called recursively. Implementors should be mindful of potential
439/// deadlocks or excessive resource consumption if recursive calls are not handled carefully
440/// (e.g., using `try_borrow_mut` or non-blocking polls within implementations).
441pub trait CustomExecutor {
442 /// Spawns a future, usually on a thread pool.
443 fn spawn(&self, fut: PinnedFuture<()>);
444 /// Spawns a local future. May require calling `poll_local` to make progress.
445 fn spawn_local(&self, fut: PinnedLocalFuture<()>);
446 /// Polls the executor, if it supports polling. Implementations should ideally be
447 /// non-blocking or use mechanisms like `try_tick` or `try_borrow_mut` to handle
448 /// re-entrant calls safely.
449 fn poll_local(&self);
450}
451
452// Ensure CustomExecutor is object-safe
453#[allow(dead_code)]
454fn test_object_safety(_: Box<dyn CustomExecutor + Send + Sync>) {} // Added Send + Sync constraint here for global usage
455
456/// Handles the case where `Executor::spawn` is called without an initialized executor.
457#[cold] // Less likely path
458#[inline(never)]
459#[track_caller]
460fn handle_uninitialized_spawn(_fut: PinnedFuture<()>) {
461 let caller = std::panic::Location::caller();
462 #[cfg(all(debug_assertions, feature = "tracing"))]
463 {
464 tracing::error!(
465 target: "any_spawner",
466 spawn_caller=%caller,
467 "Executor::spawn called before a global executor was initialized. Task dropped."
468 );
469 // Drop the future implicitly after logging
470 drop(_fut);
471 }
472 #[cfg(all(debug_assertions, not(feature = "tracing")))]
473 {
474 panic!(
475 "At {caller}, tried to spawn a Future with Executor::spawn() \
476 before a global executor was initialized."
477 );
478 }
479 // In release builds (without tracing), call the specific no-op function.
480 #[cfg(not(debug_assertions))]
481 {
482 no_op_spawn(_fut);
483 }
484}
485
486/// Handles the case where `Executor::spawn_local` is called without an initialized executor.
487#[cold] // Less likely path
488#[inline(never)]
489#[track_caller]
490fn handle_uninitialized_spawn_local(_fut: PinnedLocalFuture<()>) {
491 let caller = std::panic::Location::caller();
492 #[cfg(all(debug_assertions, feature = "tracing"))]
493 {
494 tracing::error!(
495 target: "any_spawner",
496 spawn_caller=%caller,
497 "Executor::spawn_local called before a global executor was initialized. \
498 Task likely dropped or panicked."
499 );
500 // Fall through to panic or no-op depending on build/target
501 }
502 #[cfg(all(debug_assertions, not(feature = "tracing")))]
503 {
504 panic!(
505 "At {caller}, tried to spawn a Future with \
506 Executor::spawn_local() before a global executor was initialized."
507 );
508 }
509 // In release builds (without tracing), call the specific no-op function (which usually panics).
510 #[cfg(not(debug_assertions))]
511 {
512 no_op_spawn_local(_fut);
513 }
514}