1#![no_std]
60#![cfg_attr(docsrs, feature(doc_cfg))]
61#![warn(missing_docs, missing_debug_implementations)]
62
63#[cfg(feature = "async-executor")]
64#[cfg_attr(docsrs, doc(cfg(feature = "async-executor")))]
65pub mod async_executor;
66
67#[cfg(feature = "tokio")]
68#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
69pub mod tokio;
70
71#[cfg(feature = "web")]
72#[cfg_attr(docsrs, doc(cfg(feature = "web")))]
73pub mod web;
74
75use core::{
76 any::Any,
77 fmt::Debug,
78 future::Future,
79 marker::PhantomData,
80 panic::AssertUnwindSafe,
81 pin::Pin,
82 task::{Context, Poll},
83};
84
85use alloc::boxed::Box;
86use async_channel::Receiver;
87
88extern crate alloc;
89
90#[cfg_attr(feature = "web", doc = "- [`web::WebExecutor`] for WASM environments")]
101pub trait Executor: Send + Sync {
102 type Task<T: Send + 'static>: Task<T> + Send;
107
108 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
113 where
114 Fut: Future<Output: Send> + Send + 'static;
115}
116
117#[cfg_attr(feature = "web", doc = "- [`web::WebExecutor`] for WASM environments")]
128pub trait LocalExecutor {
129 type Task<T: 'static>: Task<T>;
134
135 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
140 where
141 Fut: Future + 'static;
142}
143
144trait AnyLocalExecutorImpl: 'static + Any {
145 fn spawn_boxed(
146 &self,
147 fut: Pin<Box<dyn Future<Output = ()>>>,
148 ) -> Pin<Box<dyn Task<()> + 'static>>;
149}
150
151impl<E> AnyLocalExecutorImpl for E
152where
153 E: LocalExecutor + 'static,
154{
155 fn spawn_boxed(
156 &self,
157 fut: Pin<Box<dyn Future<Output = ()>>>,
158 ) -> Pin<Box<dyn Task<()> + 'static>> {
159 let task = self.spawn(fut);
160 Box::pin(task)
161 }
162}
163
164pub struct AnyLocalExecutor(Box<dyn AnyLocalExecutorImpl>);
169
170impl Debug for AnyLocalExecutor {
171 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
172 f.debug_struct("AnyLocalExecutor").finish()
173 }
174}
175
176impl Debug for AnyExecutor {
177 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
178 f.debug_struct("AnyExecutor").finish()
179 }
180}
181
182impl AnyExecutor {
183 pub fn new(executor: impl Executor + 'static) -> Self {
185 Self(Box::new(executor))
186 }
187
188 pub fn downcast_ref<E: Executor + 'static>(&self) -> Option<&E> {
192 let any: &dyn Any = self.0.as_ref();
193
194 any.downcast_ref()
195 }
196
197 pub fn downcast<E: Executor + 'static>(self) -> Result<Box<E>, Self> {
202 if (&self.0 as &dyn Any).is::<E>() {
203 Ok((self.0 as Box<dyn Any>).downcast().ok().unwrap())
204 } else {
205 Err(self)
206 }
207 }
208}
209
210impl AnyLocalExecutor {
211 pub fn new(executor: impl LocalExecutor + 'static) -> Self {
213 Self(Box::new(executor))
214 }
215
216 pub fn downcast_ref<E: LocalExecutor + 'static>(&self) -> Option<&E> {
220 let any: &dyn Any = self.0.as_ref();
221
222 any.downcast_ref()
223 }
224
225 pub fn downcast<E: LocalExecutor + 'static>(self) -> Result<Box<E>, Self> {
230 if (&self.0 as &dyn Any).is::<E>() {
231 Ok((self.0 as Box<dyn Any>).downcast().ok().unwrap())
232 } else {
233 Err(self)
234 }
235 }
236}
237
238pub struct AnyLocalExecutorTask<T> {
244 inner: Pin<Box<dyn Task<()> + 'static>>,
245 receiver: Receiver<Result<T, Error>>,
246}
247
248impl<T> core::fmt::Debug for AnyLocalExecutorTask<T> {
249 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
250 f.debug_struct("AnyLocalExecutorTask")
251 .finish_non_exhaustive()
252 }
253}
254
255impl<T> Future for AnyLocalExecutorTask<T> {
256 type Output = T;
257
258 fn poll(
259 self: Pin<&mut Self>,
260 cx: &mut core::task::Context<'_>,
261 ) -> core::task::Poll<Self::Output> {
262 self.poll_result(cx).map(|res| res.unwrap())
263 }
264}
265
266impl<T> Task<T> for AnyLocalExecutorTask<T> {
267 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
268 let this = unsafe { self.get_unchecked_mut() };
270 let _ = this.inner.as_mut().poll(cx);
271
272 let mut recv = this.receiver.recv();
274 unsafe {
275 Pin::new_unchecked(&mut recv)
276 .poll(cx)
277 .map(|res| res.unwrap_or_else(|_| Err(Box::new("Channel closed"))))
278 }
279 }
280 fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
281 let this = unsafe { self.get_unchecked_mut() };
282 this.inner.as_mut().poll_cancel(cx)
283 }
284}
285
286impl LocalExecutor for AnyLocalExecutor {
287 type Task<T: 'static> = AnyLocalExecutorTask<T>;
288
289 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
290 where
291 Fut: Future + 'static,
292 {
293 let (sender, receiver) = async_channel::bounded(1);
294 let fut = async move {
295 let res = AssertUnwindSafe(fut).await;
296 let _ = sender.send(Ok(res)).await;
297 };
298 let inner = self.0.spawn_boxed(Box::pin(fut));
299 AnyLocalExecutorTask { inner, receiver }
300 }
301}
302
303type Error = Box<dyn core::any::Any + Send>;
307
308pub trait Task<T>: Future<Output = T> {
316 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>>;
321
322 fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
327
328 fn result(self) -> impl Future<Output = Result<T, Error>>
334 where
335 Self: Sized,
336 {
337 ResultFuture {
338 task: self,
339 _phantom: PhantomData,
340 }
341 }
342
343 fn cancel(self) -> impl Future<Output = ()>
349 where
350 Self: Sized,
351 {
352 CancelFuture {
353 task: self,
354 _phantom: PhantomData,
355 }
356 }
357}
358
359pub struct ResultFuture<T: Task<U>, U> {
364 task: T,
365 _phantom: PhantomData<U>,
366}
367
368impl<T: Task<U>, U> core::fmt::Debug for ResultFuture<T, U> {
369 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
370 f.debug_struct("ResultFuture").finish_non_exhaustive()
371 }
372}
373
374impl<T: Task<U>, U> Future for ResultFuture<T, U> {
375 type Output = Result<U, Error>;
376
377 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
378 let this = unsafe { self.get_unchecked_mut() };
379 unsafe { Pin::new_unchecked(&mut this.task) }.poll_result(cx)
380 }
381}
382
383pub struct CancelFuture<T: Task<U>, U> {
389 task: T,
390 _phantom: PhantomData<U>,
391}
392
393impl<T: Task<U>, U> core::fmt::Debug for CancelFuture<T, U> {
394 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
395 f.debug_struct("CancelFuture").finish_non_exhaustive()
396 }
397}
398
399impl<T: Task<U>, U> Future for CancelFuture<T, U> {
400 type Output = ();
401
402 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
403 let this = unsafe { self.get_unchecked_mut() };
404 unsafe { Pin::new_unchecked(&mut this.task) }.poll_cancel(cx)
405 }
406}
407
408pub struct AnyExecutor(Box<dyn AnyExecutorImpl>);
414
415pub struct AnyExecutorTask<T> {
420 inner: Pin<Box<dyn Task<()> + Send>>,
421 receiver: Receiver<Result<T, Error>>,
422}
423
424impl<T> core::fmt::Debug for AnyExecutorTask<T> {
425 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
426 f.debug_struct("AnyExecutorTask").finish_non_exhaustive()
427 }
428}
429
430impl<T: Send> Future for AnyExecutorTask<T> {
431 type Output = T;
432
433 fn poll(
434 self: Pin<&mut Self>,
435 cx: &mut core::task::Context<'_>,
436 ) -> core::task::Poll<Self::Output> {
437 self.poll_result(cx).map(|res| res.unwrap())
438 }
439}
440
441impl<T: Send> Task<T> for AnyExecutorTask<T> {
442 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
443 let this = unsafe { self.get_unchecked_mut() };
445 let _ = this.inner.as_mut().poll(cx);
446
447 let mut recv = this.receiver.recv();
449 unsafe {
450 Pin::new_unchecked(&mut recv)
451 .poll(cx)
452 .map(|res| res.unwrap_or_else(|_| Err(Box::new("Channel closed"))))
453 }
454 }
455 fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
456 let this = unsafe { self.get_unchecked_mut() };
457 this.inner.as_mut().poll_cancel(cx)
458 }
459}
460
461impl Executor for AnyExecutor {
462 type Task<T: Send + 'static> = AnyExecutorTask<T>;
463
464 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
465 where
466 Fut: Future<Output: Send> + Send + 'static,
467 {
468 let (sender, receiver) = async_channel::bounded(1);
469 let fut = async move {
470 let res = AssertUnwindSafe(fut).await;
471 let _ = sender.send(Ok(res)).await;
472 };
473 let inner = self.0.spawn_boxed(Box::pin(fut));
474 AnyExecutorTask { inner, receiver }
475 }
476}
477
478trait AnyExecutorImpl: Send + Sync + Any {
479 fn spawn_boxed(
480 &self,
481 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
482 ) -> Pin<Box<dyn Task<()> + Send>>;
483}
484
485impl<T: Task<T>> Task<T> for Pin<Box<T>> {
486 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
487 let this = unsafe { self.get_unchecked_mut() };
488 this.as_mut().poll_result(cx)
489 }
490 fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
491 let this = unsafe { self.get_unchecked_mut() };
492
493 this.as_mut().poll_cancel(cx)
494 }
495}
496
497impl<E> AnyExecutorImpl for E
498where
499 E: Executor + 'static,
500{
501 fn spawn_boxed(
502 &self,
503 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
504 ) -> Pin<Box<dyn Task<()> + Send>> {
505 let task = self.spawn(fut);
506 Box::pin(task)
507 }
508}
509
510mod std_on {
511 use alloc::boxed::Box;
512
513 use crate::{
514 AnyExecutor, AnyExecutorTask, AnyLocalExecutor, AnyLocalExecutorTask, Executor,
515 LocalExecutor,
516 };
517
518 extern crate std;
519
520 use core::{cell::OnceCell, future::Future, panic::AssertUnwindSafe};
521 use std::sync::OnceLock;
522 std::thread_local! {
523 static LOCAL_EXECUTOR: OnceCell<AnyLocalExecutor> = const { OnceCell::new() };
524 }
525
526 pub fn init_local_executor(executor: impl LocalExecutor + 'static) {
536 if try_init_local_executor(executor).is_err() {
537 panic!("Local executor already set for this thread");
538 }
539 }
540
541 pub fn try_init_local_executor<E>(executor: E) -> Result<(), E>
545 where
546 E: LocalExecutor + 'static,
547 {
548 LOCAL_EXECUTOR.with(|cell| {
549 cell.set(AnyLocalExecutor::new(executor))
550 .map_err(|e| *e.downcast().unwrap())
551 })
552 }
553
554 static GLOBAL_EXECUTOR: OnceLock<AnyExecutor> = OnceLock::new();
555
556 pub fn init_global_executor(executor: impl crate::Executor + 'static) {
566 if GLOBAL_EXECUTOR.set(AnyExecutor::new(executor)).is_err() {
567 panic!("Global executor already set");
568 }
569 }
570
571 pub fn try_init_global_executor<E>(executor: E) -> Result<(), E>
575 where
576 E: crate::Executor + 'static,
577 {
578 GLOBAL_EXECUTOR
579 .set(AnyExecutor::new(executor))
580 .map_err(|e| *e.downcast().unwrap())
581 }
582
583 pub fn spawn<Fut>(fut: Fut) -> AnyExecutorTask<Fut::Output>
593 where
594 Fut: Future<Output: Send> + Send + 'static,
595 {
596 let executor = GLOBAL_EXECUTOR.get().expect("Global executor not set");
597 executor.spawn(fut)
598 }
599
600 pub fn spawn_local<Fut>(fut: Fut) -> AnyLocalExecutorTask<Fut::Output>
611 where
612 Fut: Future + 'static,
613 {
614 LOCAL_EXECUTOR.with(|cell| {
615 let executor = cell.get().expect("Local executor not set");
616 executor.spawn(fut)
617 })
618 }
619
620 #[allow(unused)]
621 pub(crate) fn catch_unwind<F, R>(f: F) -> Result<R, Box<dyn std::any::Any + Send>>
622 where
623 F: FnOnce() -> R,
624 {
625 std::panic::catch_unwind(AssertUnwindSafe(f))
626 }
627}
628
629pub use std_on::*;
630
631#[cfg(feature = "async-executor")]
633#[cfg_attr(docsrs, doc(cfg(feature = "async-executor")))]
634pub use async_executor::AsyncTask;
635
636#[cfg(feature = "tokio")]
638#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
639pub use tokio::{DefaultExecutor, TokioLocalTask, TokioTask};
640
641#[cfg(feature = "web")]
643#[cfg_attr(docsrs, doc(cfg(feature = "web")))]
644pub use web::{WebExecutor, WebTask};