1#![no_std]
60#![cfg_attr(docsrs, feature(doc_cfg))]
61#![warn(missing_docs, missing_debug_implementations)]
62
63#[cfg(feature = "async-executor")]
64#[cfg_attr(docsrs, doc(cfg(feature = "async-executor")))]
65pub mod async_executor;
66
67#[cfg(feature = "tokio")]
68#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
69pub mod tokio;
70
71#[cfg(feature = "web")]
72#[cfg_attr(docsrs, doc(cfg(feature = "web")))]
73pub mod web;
74
75use core::{
76 any::Any,
77 fmt::Debug,
78 future::Future,
79 marker::PhantomData,
80 panic::AssertUnwindSafe,
81 pin::Pin,
82 task::{Context, Poll},
83};
84
85use alloc::boxed::Box;
86use async_channel::Receiver;
87
88extern crate alloc;
89
90#[cfg_attr(feature = "web", doc = "- [`web::WebExecutor`] for WASM environments")]
101pub trait Executor: Send + Sync {
102 type Task<T: Send + 'static>: Task<T> + Send;
107
108 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
113 where
114 Fut: Future<Output: Send> + Send + 'static;
115}
116
117impl<E: Executor> Executor for &E {
118 type Task<T: Send + 'static> = E::Task<T>;
119
120 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
121 where
122 Fut: Future<Output: Send> + Send + 'static,
123 {
124 (*self).spawn(fut)
125 }
126}
127
128impl<E: Executor> Executor for &mut E {
129 type Task<T: Send + 'static> = E::Task<T>;
130
131 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
132 where
133 Fut: Future<Output: Send> + Send + 'static,
134 {
135 (**self).spawn(fut)
136 }
137}
138
139impl<E: Executor> Executor for Box<E> {
140 type Task<T: Send + 'static> = E::Task<T>;
141
142 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
143 where
144 Fut: Future<Output: Send> + Send + 'static,
145 {
146 (**self).spawn(fut)
147 }
148}
149
150impl<E: Executor> Executor for alloc::sync::Arc<E> {
151 type Task<T: Send + 'static> = E::Task<T>;
152
153 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
154 where
155 Fut: Future<Output: Send> + Send + 'static,
156 {
157 (**self).spawn(fut)
158 }
159}
160
161#[cfg_attr(feature = "web", doc = "- [`web::WebExecutor`] for WASM environments")]
172pub trait LocalExecutor {
173 type Task<T: 'static>: Task<T>;
178
179 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
184 where
185 Fut: Future + 'static;
186}
187
188impl<E: LocalExecutor> LocalExecutor for &E {
189 type Task<T: 'static> = E::Task<T>;
190
191 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
192 where
193 Fut: Future + 'static,
194 {
195 (*self).spawn(fut)
196 }
197}
198
199impl<E: LocalExecutor> LocalExecutor for &mut E {
200 type Task<T: 'static> = E::Task<T>;
201
202 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
203 where
204 Fut: Future + 'static,
205 {
206 (**self).spawn(fut)
207 }
208}
209
210impl<E: LocalExecutor> LocalExecutor for Box<E> {
211 type Task<T: 'static> = E::Task<T>;
212
213 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
214 where
215 Fut: Future + 'static,
216 {
217 (**self).spawn(fut)
218 }
219}
220
221impl<E: LocalExecutor> LocalExecutor for alloc::rc::Rc<E> {
222 type Task<T: 'static> = E::Task<T>;
223
224 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
225 where
226 Fut: Future + 'static,
227 {
228 (**self).spawn(fut)
229 }
230}
231
232impl<E: LocalExecutor> LocalExecutor for alloc::sync::Arc<E> {
233 type Task<T: 'static> = E::Task<T>;
234
235 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
236 where
237 Fut: Future + 'static,
238 {
239 (**self).spawn(fut)
240 }
241}
242
243trait AnyLocalExecutorImpl: 'static + Any {
244 fn spawn_boxed(
245 &self,
246 fut: Pin<Box<dyn Future<Output = ()>>>,
247 ) -> Pin<Box<dyn Task<()> + 'static>>;
248}
249
250impl<E> AnyLocalExecutorImpl for E
251where
252 E: LocalExecutor + 'static,
253{
254 fn spawn_boxed(
255 &self,
256 fut: Pin<Box<dyn Future<Output = ()>>>,
257 ) -> Pin<Box<dyn Task<()> + 'static>> {
258 let task = self.spawn(fut);
259 Box::pin(task)
260 }
261}
262
263pub struct AnyLocalExecutor(Box<dyn AnyLocalExecutorImpl>);
268
269impl Debug for AnyLocalExecutor {
270 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
271 f.debug_struct("AnyLocalExecutor").finish()
272 }
273}
274
275impl Debug for AnyExecutor {
276 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
277 f.debug_struct("AnyExecutor").finish()
278 }
279}
280
281impl AnyExecutor {
282 pub fn new(executor: impl Executor + 'static) -> Self {
284 Self(Box::new(executor))
285 }
286
287 pub fn downcast_ref<E: Executor + 'static>(&self) -> Option<&E> {
291 let any: &dyn Any = self.0.as_ref();
292
293 any.downcast_ref()
294 }
295
296 pub fn downcast<E: Executor + 'static>(self) -> Result<Box<E>, Self> {
301 if (&self.0 as &dyn Any).is::<E>() {
302 Ok((self.0 as Box<dyn Any>).downcast().ok().unwrap())
303 } else {
304 Err(self)
305 }
306 }
307}
308
309impl AnyLocalExecutor {
310 pub fn new(executor: impl LocalExecutor + 'static) -> Self {
312 Self(Box::new(executor))
313 }
314
315 pub fn downcast_ref<E: LocalExecutor + 'static>(&self) -> Option<&E> {
319 let any: &dyn Any = self.0.as_ref();
320
321 any.downcast_ref()
322 }
323
324 pub fn downcast<E: LocalExecutor + 'static>(self) -> Result<Box<E>, Self> {
329 if (&self.0 as &dyn Any).is::<E>() {
330 Ok((self.0 as Box<dyn Any>).downcast().ok().unwrap())
331 } else {
332 Err(self)
333 }
334 }
335}
336
337pub struct AnyLocalExecutorTask<T> {
343 inner: Pin<Box<dyn Task<()> + 'static>>,
344 receiver: Receiver<Result<T, Error>>,
345}
346
347impl<T> core::fmt::Debug for AnyLocalExecutorTask<T> {
348 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
349 f.debug_struct("AnyLocalExecutorTask")
350 .finish_non_exhaustive()
351 }
352}
353
354impl<T> Future for AnyLocalExecutorTask<T> {
355 type Output = T;
356
357 fn poll(
358 self: Pin<&mut Self>,
359 cx: &mut core::task::Context<'_>,
360 ) -> core::task::Poll<Self::Output> {
361 self.poll_result(cx).map(|res| res.unwrap())
362 }
363}
364
365impl<T> Task<T> for AnyLocalExecutorTask<T> {
366 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
367 let this = unsafe { self.get_unchecked_mut() };
369 let _ = this.inner.as_mut().poll(cx);
370
371 let mut recv = this.receiver.recv();
373 unsafe {
374 Pin::new_unchecked(&mut recv)
375 .poll(cx)
376 .map(|res| res.unwrap_or_else(|_| Err(Box::new("Channel closed"))))
377 }
378 }
379 fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
380 let this = unsafe { self.get_unchecked_mut() };
381 this.inner.as_mut().poll_cancel(cx)
382 }
383}
384
385impl LocalExecutor for AnyLocalExecutor {
386 type Task<T: 'static> = AnyLocalExecutorTask<T>;
387
388 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
389 where
390 Fut: Future + 'static,
391 {
392 let (sender, receiver) = async_channel::bounded(1);
393 let fut = async move {
394 let res = AssertUnwindSafe(fut).await;
395 let _ = sender.send(Ok(res)).await;
396 };
397 let inner = self.0.spawn_boxed(Box::pin(fut));
398 AnyLocalExecutorTask { inner, receiver }
399 }
400}
401
402type Error = Box<dyn core::any::Any + Send>;
406
407pub trait Task<T>: Future<Output = T> {
415 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>>;
420
421 fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
426
427 fn result(self) -> impl Future<Output = Result<T, Error>>
433 where
434 Self: Sized,
435 {
436 ResultFuture {
437 task: self,
438 _phantom: PhantomData,
439 }
440 }
441
442 fn cancel(self) -> impl Future<Output = ()>
448 where
449 Self: Sized,
450 {
451 CancelFuture {
452 task: self,
453 _phantom: PhantomData,
454 }
455 }
456}
457
458pub struct ResultFuture<T: Task<U>, U> {
463 task: T,
464 _phantom: PhantomData<U>,
465}
466
467impl<T: Task<U>, U> core::fmt::Debug for ResultFuture<T, U> {
468 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
469 f.debug_struct("ResultFuture").finish_non_exhaustive()
470 }
471}
472
473impl<T: Task<U>, U> Future for ResultFuture<T, U> {
474 type Output = Result<U, Error>;
475
476 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
477 let this = unsafe { self.get_unchecked_mut() };
478 unsafe { Pin::new_unchecked(&mut this.task) }.poll_result(cx)
479 }
480}
481
482pub struct CancelFuture<T: Task<U>, U> {
488 task: T,
489 _phantom: PhantomData<U>,
490}
491
492impl<T: Task<U>, U> core::fmt::Debug for CancelFuture<T, U> {
493 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
494 f.debug_struct("CancelFuture").finish_non_exhaustive()
495 }
496}
497
498impl<T: Task<U>, U> Future for CancelFuture<T, U> {
499 type Output = ();
500
501 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
502 let this = unsafe { self.get_unchecked_mut() };
503 unsafe { Pin::new_unchecked(&mut this.task) }.poll_cancel(cx)
504 }
505}
506
507pub struct AnyExecutor(Box<dyn AnyExecutorImpl>);
513
514pub struct AnyExecutorTask<T> {
519 inner: Pin<Box<dyn Task<()> + Send>>,
520 receiver: Receiver<Result<T, Error>>,
521}
522
523impl<T> core::fmt::Debug for AnyExecutorTask<T> {
524 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
525 f.debug_struct("AnyExecutorTask").finish_non_exhaustive()
526 }
527}
528
529impl<T: Send> Future for AnyExecutorTask<T> {
530 type Output = T;
531
532 fn poll(
533 self: Pin<&mut Self>,
534 cx: &mut core::task::Context<'_>,
535 ) -> core::task::Poll<Self::Output> {
536 self.poll_result(cx).map(|res| res.unwrap())
537 }
538}
539
540impl<T: Send> Task<T> for AnyExecutorTask<T> {
541 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
542 let this = unsafe { self.get_unchecked_mut() };
544 let _ = this.inner.as_mut().poll(cx);
545
546 let mut recv = this.receiver.recv();
548 unsafe {
549 Pin::new_unchecked(&mut recv)
550 .poll(cx)
551 .map(|res| res.unwrap_or_else(|_| Err(Box::new("Channel closed"))))
552 }
553 }
554 fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
555 let this = unsafe { self.get_unchecked_mut() };
556 this.inner.as_mut().poll_cancel(cx)
557 }
558}
559
560impl Executor for AnyExecutor {
561 type Task<T: Send + 'static> = AnyExecutorTask<T>;
562
563 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
564 where
565 Fut: Future<Output: Send> + Send + 'static,
566 {
567 let (sender, receiver) = async_channel::bounded(1);
568 let fut = async move {
569 let res = AssertUnwindSafe(fut).await;
570 let _ = sender.send(Ok(res)).await;
571 };
572 let inner = self.0.spawn_boxed(Box::pin(fut));
573 AnyExecutorTask { inner, receiver }
574 }
575}
576
577trait AnyExecutorImpl: Send + Sync + Any {
578 fn spawn_boxed(
579 &self,
580 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
581 ) -> Pin<Box<dyn Task<()> + Send>>;
582}
583
584impl<T: Task<T>> Task<T> for Pin<Box<T>> {
585 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
586 let this = unsafe { self.get_unchecked_mut() };
587 this.as_mut().poll_result(cx)
588 }
589 fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
590 let this = unsafe { self.get_unchecked_mut() };
591
592 this.as_mut().poll_cancel(cx)
593 }
594}
595
596impl<E> AnyExecutorImpl for E
597where
598 E: Executor + 'static,
599{
600 fn spawn_boxed(
601 &self,
602 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
603 ) -> Pin<Box<dyn Task<()> + Send>> {
604 let task = self.spawn(fut);
605 Box::pin(task)
606 }
607}
608
609mod std_on {
610 use alloc::boxed::Box;
611
612 use crate::{
613 AnyExecutor, AnyExecutorTask, AnyLocalExecutor, AnyLocalExecutorTask, Executor,
614 LocalExecutor,
615 };
616
617 extern crate std;
618
619 use core::{cell::OnceCell, future::Future, panic::AssertUnwindSafe};
620 use std::sync::OnceLock;
621 std::thread_local! {
622 static LOCAL_EXECUTOR: OnceCell<AnyLocalExecutor> = const { OnceCell::new() };
623 }
624
625 pub fn init_local_executor(executor: impl LocalExecutor + 'static) {
635 if try_init_local_executor(executor).is_err() {
636 panic!("Local executor already set for this thread");
637 }
638 }
639
640 pub fn try_init_local_executor<E>(executor: E) -> Result<(), E>
644 where
645 E: LocalExecutor + 'static,
646 {
647 LOCAL_EXECUTOR.with(|cell| {
648 cell.set(AnyLocalExecutor::new(executor))
649 .map_err(|e| *e.downcast().unwrap())
650 })
651 }
652
653 static GLOBAL_EXECUTOR: OnceLock<AnyExecutor> = OnceLock::new();
654
655 pub fn init_global_executor(executor: impl crate::Executor + 'static) {
665 if GLOBAL_EXECUTOR.set(AnyExecutor::new(executor)).is_err() {
666 panic!("Global executor already set");
667 }
668 }
669
670 pub fn try_init_global_executor<E>(executor: E) -> Result<(), E>
674 where
675 E: crate::Executor + 'static,
676 {
677 GLOBAL_EXECUTOR
678 .set(AnyExecutor::new(executor))
679 .map_err(|e| *e.downcast().unwrap())
680 }
681
682 pub fn spawn<Fut>(fut: Fut) -> AnyExecutorTask<Fut::Output>
692 where
693 Fut: Future<Output: Send> + Send + 'static,
694 {
695 let executor = GLOBAL_EXECUTOR.get().expect("Global executor not set");
696 executor.spawn(fut)
697 }
698
699 pub fn spawn_local<Fut>(fut: Fut) -> AnyLocalExecutorTask<Fut::Output>
710 where
711 Fut: Future + 'static,
712 {
713 LOCAL_EXECUTOR.with(|cell| {
714 let executor = cell.get().expect("Local executor not set");
715 executor.spawn(fut)
716 })
717 }
718
719 #[allow(unused)]
720 pub(crate) fn catch_unwind<F, R>(f: F) -> Result<R, Box<dyn std::any::Any + Send>>
721 where
722 F: FnOnce() -> R,
723 {
724 std::panic::catch_unwind(AssertUnwindSafe(f))
725 }
726
727 #[derive(Clone, Copy, Debug)]
732 pub struct DefaultExecutor;
733
734 impl Executor for DefaultExecutor {
735 type Task<T: Send + 'static> = AnyExecutorTask<T>;
736
737 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
738 where
739 Fut: core::future::Future<Output: Send> + Send + 'static,
740 {
741 spawn(fut)
742 }
743 }
744
745 impl LocalExecutor for DefaultExecutor {
746 type Task<T: 'static> = AnyLocalExecutorTask<T>;
747
748 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
749 where
750 Fut: core::future::Future + 'static,
751 {
752 spawn_local(fut)
753 }
754 }
755}
756
757pub use std_on::*;
758
759#[cfg(feature = "async-executor")]
761#[cfg_attr(docsrs, doc(cfg(feature = "async-executor")))]
762pub use async_executor::AsyncTask;
763
764#[cfg(feature = "tokio")]
766#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
767pub use tokio::{TokioExecutor, TokioLocalTask, TokioTask};
768
769#[cfg(feature = "web")]
771#[cfg_attr(docsrs, doc(cfg(feature = "web")))]
772pub use web::{WebExecutor, WebTask};