any_spawner/
lib.rs

1//! This crate makes it easier to write asynchronous code that is executor-agnostic, by providing a
2//! utility that can be used to spawn tasks in a variety of executors.
3//!
4//! It only supports single executor per program, but that executor can be set at runtime, anywhere
5//! in your crate (or an application that depends on it).
6//!
7//! This can be extended to support any executor or runtime that supports spawning [`Future`]s.
8//!
9//! This is a least common denominator implementation in many ways. Limitations include:
10//! - setting an executor is a one-time, global action
11//! - no "join handle" or other result is returned from the spawn
12//! - the `Future` must output `()`
13//!
14//! ```no_run
15//! use any_spawner::Executor;
16//!
17//! // make sure an Executor has been initialized with one of the init_ functions
18//!
19//! // spawn a thread-safe Future
20//! Executor::spawn(async { /* ... */ });
21//!
22//! // spawn a Future that is !Send
23//! Executor::spawn_local(async { /* ... */ });
24//! ```
25
26#![forbid(unsafe_code)]
27#![deny(missing_docs)]
28#![cfg_attr(docsrs, feature(doc_cfg))]
29
30use std::{future::Future, pin::Pin, sync::OnceLock};
31use thiserror::Error;
32
33/// A future that has been pinned.
34pub type PinnedFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
35/// A future that has been pinned.
36pub type PinnedLocalFuture<T> = Pin<Box<dyn Future<Output = T>>>;
37
38// Type alias for the spawn function pointer.
39type SpawnFn = fn(PinnedFuture<()>);
40// Type alias for the spawn_local function pointer.
41type SpawnLocalFn = fn(PinnedLocalFuture<()>);
42// Type alias for the poll_local function pointer.
43type PollLocalFn = fn();
44
45/// Holds the function pointers for the current global executor.
46#[derive(Clone, Copy)]
47struct ExecutorFns {
48    spawn: SpawnFn,
49    spawn_local: SpawnLocalFn,
50    poll_local: PollLocalFn,
51}
52
53// Use a single OnceLock to ensure atomic initialization of all functions.
54static EXECUTOR_FNS: OnceLock<ExecutorFns> = OnceLock::new();
55
56// No-op functions to use when an executor doesn't support a specific operation.
57#[cfg(any(feature = "tokio", feature = "wasm-bindgen", feature = "glib"))]
58#[cold]
59#[inline(never)]
60fn no_op_poll() {}
61
62#[cfg(all(not(feature = "wasm-bindgen"), not(debug_assertions)))]
63#[cold]
64#[inline(never)]
65fn no_op_spawn(_: PinnedFuture<()>) {
66    #[cfg(debug_assertions)]
67    eprintln!(
68        "Warning: Executor::spawn called, but no global 'spawn' function is \
69         configured (perhaps only spawn_local is supported, e.g., on wasm \
70         without threading?)."
71    );
72}
73
74// Wasm panics if you spawn without an executor
75#[cfg(feature = "wasm-bindgen")]
76#[cold]
77#[inline(never)]
78fn no_op_spawn(_: PinnedFuture<()>) {
79    panic!(
80        "Executor::spawn called, but no global 'spawn' function is configured."
81    );
82}
83
84#[cfg(not(debug_assertions))]
85#[cold]
86#[inline(never)]
87fn no_op_spawn_local(_: PinnedLocalFuture<()>) {
88    panic!(
89        "Executor::spawn_local called, but no global 'spawn_local' function \
90         is configured."
91    );
92}
93
94/// Errors that can occur when using the executor.
95#[derive(Error, Debug)]
96pub enum ExecutorError {
97    /// The executor has already been set.
98    #[error("Global executor has already been set.")]
99    AlreadySet,
100}
101
102/// A global async executor that can spawn tasks.
103pub struct Executor;
104
105impl Executor {
106    /// Spawns a thread-safe [`Future`].
107    ///
108    /// Uses the globally configured executor.
109    /// Panics if no global executor has been initialized.
110    #[inline(always)]
111    #[track_caller]
112    pub fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
113        let pinned_fut = Box::pin(fut);
114
115        if let Some(fns) = EXECUTOR_FNS.get() {
116            (fns.spawn)(pinned_fut)
117        } else {
118            // No global executor set.
119            handle_uninitialized_spawn(pinned_fut);
120        }
121    }
122
123    /// Spawns a [`Future`] that cannot be sent across threads.
124    ///
125    /// Uses the globally configured executor.
126    /// Panics if no global executor has been initialized.
127    #[inline(always)]
128    #[track_caller]
129    pub fn spawn_local(fut: impl Future<Output = ()> + 'static) {
130        let pinned_fut = Box::pin(fut);
131
132        if let Some(fns) = EXECUTOR_FNS.get() {
133            (fns.spawn_local)(pinned_fut)
134        } else {
135            // No global executor set.
136            handle_uninitialized_spawn_local(pinned_fut);
137        }
138    }
139
140    /// Waits until the next "tick" of the current async executor.
141    /// Respects the global executor.
142    #[inline(always)]
143    pub async fn tick() {
144        let (tx, rx) = futures::channel::oneshot::channel();
145        #[cfg(not(all(feature = "wasm-bindgen", target_family = "wasm")))]
146        Executor::spawn(async move {
147            _ = tx.send(());
148        });
149        #[cfg(all(feature = "wasm-bindgen", target_family = "wasm"))]
150        Executor::spawn_local(async move {
151            _ = tx.send(());
152        });
153
154        _ = rx.await;
155    }
156
157    /// Polls the global async executor.
158    ///
159    /// Uses the globally configured executor.
160    /// Does nothing if the global executor does not support polling.
161    #[inline(always)]
162    pub fn poll_local() {
163        if let Some(fns) = EXECUTOR_FNS.get() {
164            (fns.poll_local)()
165        }
166        // If not initialized or doesn't support polling, do nothing gracefully.
167    }
168}
169
170impl Executor {
171    /// Globally sets the [`tokio`] runtime as the executor used to spawn tasks.
172    ///
173    /// Returns `Err(_)` if a global executor has already been set.
174    ///
175    /// Requires the `tokio` feature to be activated on this crate.
176    #[cfg(feature = "tokio")]
177    #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
178    pub fn init_tokio() -> Result<(), ExecutorError> {
179        let executor_impl = ExecutorFns {
180            spawn: |fut| {
181                tokio::spawn(fut);
182            },
183            spawn_local: |fut| {
184                tokio::task::spawn_local(fut);
185            },
186            // Tokio doesn't have an explicit global poll function like LocalPool::run_until_stalled
187            poll_local: no_op_poll,
188        };
189        EXECUTOR_FNS
190            .set(executor_impl)
191            .map_err(|_| ExecutorError::AlreadySet)
192    }
193
194    /// Globally sets the [`wasm-bindgen-futures`] runtime as the executor used to spawn tasks.
195    ///
196    /// Returns `Err(_)` if a global executor has already been set.
197    ///
198    /// Requires the `wasm-bindgen` feature to be activated on this crate.
199    #[cfg(feature = "wasm-bindgen")]
200    #[cfg_attr(docsrs, doc(cfg(feature = "wasm-bindgen")))]
201    pub fn init_wasm_bindgen() -> Result<(), ExecutorError> {
202        let executor_impl = ExecutorFns {
203            // wasm-bindgen-futures only supports spawn_local
204            spawn: no_op_spawn,
205            spawn_local: |fut| {
206                wasm_bindgen_futures::spawn_local(fut);
207            },
208            poll_local: no_op_poll,
209        };
210        EXECUTOR_FNS
211            .set(executor_impl)
212            .map_err(|_| ExecutorError::AlreadySet)
213    }
214
215    /// Globally sets the [`glib`] runtime as the executor used to spawn tasks.
216    ///
217    /// Returns `Err(_)` if a global executor has already been set.
218    ///
219    /// Requires the `glib` feature to be activated on this crate.
220    #[cfg(feature = "glib")]
221    #[cfg_attr(docsrs, doc(cfg(feature = "glib")))]
222    pub fn init_glib() -> Result<(), ExecutorError> {
223        let executor_impl = ExecutorFns {
224            spawn: |fut| {
225                let main_context = glib::MainContext::default();
226                main_context.spawn(fut);
227            },
228            spawn_local: |fut| {
229                let main_context = glib::MainContext::default();
230                main_context.spawn_local(fut);
231            },
232            // Glib needs event loop integration, explicit polling isn't the standard model here.
233            poll_local: no_op_poll,
234        };
235        EXECUTOR_FNS
236            .set(executor_impl)
237            .map_err(|_| ExecutorError::AlreadySet)
238    }
239
240    /// Globally sets the [`futures`] executor as the executor used to spawn tasks,
241    /// lazily creating a thread pool to spawn tasks into.
242    ///
243    /// Returns `Err(_)` if a global executor has already been set.
244    ///
245    /// Requires the `futures-executor` feature to be activated on this crate.
246    #[cfg(feature = "futures-executor")]
247    #[cfg_attr(docsrs, doc(cfg(feature = "futures-executor")))]
248    pub fn init_futures_executor() -> Result<(), ExecutorError> {
249        use futures::{
250            executor::{LocalPool, LocalSpawner, ThreadPool},
251            task::{LocalSpawnExt, SpawnExt},
252        };
253        use std::cell::RefCell;
254
255        // Keep the lazy-init ThreadPool and thread-local LocalPool for spawn_local impl
256        static THREAD_POOL: OnceLock<ThreadPool> = OnceLock::new();
257        thread_local! {
258            static LOCAL_POOL: RefCell<LocalPool> = RefCell::new(LocalPool::new());
259            // SPAWNER is derived from LOCAL_POOL, keep it for efficiency inside the closure
260            static SPAWNER: LocalSpawner = LOCAL_POOL.with(|pool| pool.borrow().spawner());
261        }
262
263        fn get_thread_pool() -> &'static ThreadPool {
264            THREAD_POOL.get_or_init(|| {
265                ThreadPool::new()
266                    .expect("could not create futures executor ThreadPool")
267            })
268        }
269
270        let executor_impl = ExecutorFns {
271            spawn: |fut| {
272                get_thread_pool()
273                    .spawn(fut)
274                    .expect("failed to spawn future on ThreadPool");
275            },
276            spawn_local: |fut| {
277                // Use the thread_local SPAWNER derived from LOCAL_POOL
278                SPAWNER.with(|spawner| {
279                    spawner
280                        .spawn_local(fut)
281                        .expect("failed to spawn local future");
282                });
283            },
284            poll_local: || {
285                // Use the thread_local LOCAL_POOL
286                LOCAL_POOL.with(|pool| {
287                    // Use try_borrow_mut to prevent panic during re-entrant calls
288                    if let Ok(mut pool) = pool.try_borrow_mut() {
289                        pool.run_until_stalled();
290                    }
291                    // If already borrowed, we're likely in a nested poll, so do nothing.
292                });
293            },
294        };
295
296        EXECUTOR_FNS
297            .set(executor_impl)
298            .map_err(|_| ExecutorError::AlreadySet)
299    }
300
301    /// Globally sets the [`async_executor`] executor as the executor used to spawn tasks,
302    /// lazily creating a thread pool to spawn tasks into.
303    ///
304    /// Returns `Err(_)` if a global executor has already been set.
305    ///
306    /// Requires the `async-executor` feature to be activated on this crate.
307    #[cfg(feature = "async-executor")]
308    #[cfg_attr(docsrs, doc(cfg(feature = "async-executor")))]
309    pub fn init_async_executor() -> Result<(), ExecutorError> {
310        use async_executor::{Executor as AsyncExecutor, LocalExecutor};
311
312        // Keep the lazy-init global Executor and thread-local LocalExecutor for spawn_local impl
313        static ASYNC_EXECUTOR: OnceLock<AsyncExecutor<'static>> =
314            OnceLock::new();
315        thread_local! {
316            static LOCAL_EXECUTOR_POOL: LocalExecutor<'static> = const { LocalExecutor::new() };
317        }
318
319        fn get_async_executor() -> &'static AsyncExecutor<'static> {
320            ASYNC_EXECUTOR.get_or_init(AsyncExecutor::new)
321        }
322
323        let executor_impl = ExecutorFns {
324            spawn: |fut| {
325                get_async_executor().spawn(fut).detach();
326            },
327            spawn_local: |fut| {
328                LOCAL_EXECUTOR_POOL.with(|pool| pool.spawn(fut).detach());
329            },
330            poll_local: || {
331                LOCAL_EXECUTOR_POOL.with(|pool| {
332                    // try_tick polls the local executor without blocking
333                    // This prevents issues if called recursively or from within a task.
334                    pool.try_tick();
335                });
336            },
337        };
338        EXECUTOR_FNS
339            .set(executor_impl)
340            .map_err(|_| ExecutorError::AlreadySet)
341    }
342
343    /// Globally sets a custom executor as the executor used to spawn tasks.
344    ///
345    /// Requires the custom executor to be `Send + Sync` as it will be stored statically.
346    ///
347    /// Returns `Err(_)` if a global executor has already been set.
348    pub fn init_custom_executor(
349        custom_executor: impl CustomExecutor + Send + Sync + 'static,
350    ) -> Result<(), ExecutorError> {
351        // Store the custom executor instance itself to call its methods.
352        // Use Box for dynamic dispatch.
353        static CUSTOM_EXECUTOR_INSTANCE: OnceLock<
354            Box<dyn CustomExecutor + Send + Sync>,
355        > = OnceLock::new();
356
357        CUSTOM_EXECUTOR_INSTANCE
358            .set(Box::new(custom_executor))
359            .map_err(|_| ExecutorError::AlreadySet)?;
360
361        // Now set the ExecutorFns using the stored instance
362        let executor_impl = ExecutorFns {
363            spawn: |fut| {
364                // Unwrap is safe because we just set it successfully or returned Err.
365                CUSTOM_EXECUTOR_INSTANCE.get().unwrap().spawn(fut);
366            },
367            spawn_local: |fut| {
368                CUSTOM_EXECUTOR_INSTANCE.get().unwrap().spawn_local(fut);
369            },
370            poll_local: || {
371                CUSTOM_EXECUTOR_INSTANCE.get().unwrap().poll_local();
372            },
373        };
374
375        EXECUTOR_FNS
376            .set(executor_impl)
377            .map_err(|_| ExecutorError::AlreadySet)
378        // If setting EXECUTOR_FNS fails (extremely unlikely race if called *concurrently*
379        // with another init_* after CUSTOM_EXECUTOR_INSTANCE was set), we technically
380        // leave CUSTOM_EXECUTOR_INSTANCE set but EXECUTOR_FNS not. This is an edge case,
381        // but the primary race condition is solved.
382    }
383
384    /// Sets a custom executor *for the current thread only*.
385    ///
386    /// This overrides the global executor for calls to `spawn`, `spawn_local`, and `poll_local`
387    /// made *from the current thread*. It does not affect other threads or the global state.
388    ///
389    /// The provided `custom_executor` must implement [`CustomExecutor`] and `'static`, but does
390    /// **not** need to be `Send` or `Sync`.
391    ///
392    /// Returns `Err(ExecutorError::AlreadySet)` if a *local* executor has already been set
393    /// *for this thread*.
394    pub fn init_local_custom_executor(
395        custom_executor: impl CustomExecutor + 'static,
396    ) -> Result<(), ExecutorError> {
397        // Store the custom executor instance itself to call its methods.
398        // Use Box for dynamic dispatch.
399        thread_local! {
400            static CUSTOM_EXECUTOR_INSTANCE: OnceLock<
401                Box<dyn CustomExecutor>,
402            > = OnceLock::new();
403        };
404
405        CUSTOM_EXECUTOR_INSTANCE.with(|this| {
406            this.set(Box::new(custom_executor))
407                .map_err(|_| ExecutorError::AlreadySet)
408        })?;
409
410        // Now set the ExecutorFns using the stored instance
411        let executor_impl = ExecutorFns {
412            spawn: |fut| {
413                // Unwrap is safe because we just set it successfully or returned Err.
414                CUSTOM_EXECUTOR_INSTANCE
415                    .with(|this| this.get().unwrap().spawn(fut));
416            },
417            spawn_local: |fut| {
418                CUSTOM_EXECUTOR_INSTANCE
419                    .with(|this| this.get().unwrap().spawn_local(fut));
420            },
421            poll_local: || {
422                CUSTOM_EXECUTOR_INSTANCE
423                    .with(|this| this.get().unwrap().poll_local());
424            },
425        };
426
427        EXECUTOR_FNS
428            .set(executor_impl)
429            .map_err(|_| ExecutorError::AlreadySet)
430    }
431}
432
433/// A trait for custom executors.
434/// Custom executors can be used to integrate with any executor that supports spawning futures.
435///
436/// If used with `init_custom_executor`, the implementation must be `Send + Sync + 'static`.
437///
438/// All methods can be called recursively. Implementors should be mindful of potential
439/// deadlocks or excessive resource consumption if recursive calls are not handled carefully
440/// (e.g., using `try_borrow_mut` or non-blocking polls within implementations).
441pub trait CustomExecutor {
442    /// Spawns a future, usually on a thread pool.
443    fn spawn(&self, fut: PinnedFuture<()>);
444    /// Spawns a local future. May require calling `poll_local` to make progress.
445    fn spawn_local(&self, fut: PinnedLocalFuture<()>);
446    /// Polls the executor, if it supports polling. Implementations should ideally be
447    /// non-blocking or use mechanisms like `try_tick` or `try_borrow_mut` to handle
448    /// re-entrant calls safely.
449    fn poll_local(&self);
450}
451
452// Ensure CustomExecutor is object-safe
453#[allow(dead_code)]
454fn test_object_safety(_: Box<dyn CustomExecutor + Send + Sync>) {} // Added Send + Sync constraint here for global usage
455
456/// Handles the case where `Executor::spawn` is called without an initialized executor.
457#[cold] // Less likely path
458#[inline(never)]
459#[track_caller]
460fn handle_uninitialized_spawn(_fut: PinnedFuture<()>) {
461    let caller = std::panic::Location::caller();
462    #[cfg(all(debug_assertions, feature = "tracing"))]
463    {
464        tracing::error!(
465            target: "any_spawner",
466            spawn_caller=%caller,
467            "Executor::spawn called before a global executor was initialized. Task dropped."
468        );
469        // Drop the future implicitly after logging
470        drop(_fut);
471    }
472    #[cfg(all(debug_assertions, not(feature = "tracing")))]
473    {
474        panic!(
475            "At {caller}, tried to spawn a Future with Executor::spawn() \
476             before a global executor was initialized."
477        );
478    }
479    // In release builds (without tracing), call the specific no-op function.
480    #[cfg(not(debug_assertions))]
481    {
482        no_op_spawn(_fut);
483    }
484}
485
486/// Handles the case where `Executor::spawn_local` is called without an initialized executor.
487#[cold] // Less likely path
488#[inline(never)]
489#[track_caller]
490fn handle_uninitialized_spawn_local(_fut: PinnedLocalFuture<()>) {
491    let caller = std::panic::Location::caller();
492    #[cfg(all(debug_assertions, feature = "tracing"))]
493    {
494        tracing::error!(
495            target: "any_spawner",
496            spawn_caller=%caller,
497            "Executor::spawn_local called before a global executor was initialized. \
498            Task likely dropped or panicked."
499        );
500        // Fall through to panic or no-op depending on build/target
501    }
502    #[cfg(all(debug_assertions, not(feature = "tracing")))]
503    {
504        panic!(
505            "At {caller}, tried to spawn a Future with \
506             Executor::spawn_local() before a global executor was initialized."
507        );
508    }
509    // In release builds (without tracing), call the specific no-op function (which usually panics).
510    #[cfg(not(debug_assertions))]
511    {
512        no_op_spawn_local(_fut);
513    }
514}