1#![no_std]
60#![cfg_attr(docsrs, feature(doc_cfg))]
61#![warn(missing_docs, missing_debug_implementations)]
62
63#[cfg(feature = "async-task")]
64#[cfg_attr(docsrs, doc(cfg(feature = "async-task")))]
65pub mod async_task;
66
67#[cfg(feature = "async-executor")]
68#[cfg_attr(docsrs, doc(cfg(feature = "async-executor")))]
69pub mod async_executor;
70
71#[cfg(feature = "tokio")]
72#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
73pub mod tokio;
74
75#[cfg(feature = "web")]
76#[cfg_attr(docsrs, doc(cfg(feature = "web")))]
77pub mod web;
78
79use core::{
80 any::Any,
81 fmt::Debug,
82 future::Future,
83 marker::PhantomData,
84 panic::AssertUnwindSafe,
85 pin::Pin,
86 task::{Context, Poll},
87};
88
89use alloc::boxed::Box;
90use async_channel::Receiver;
91
92extern crate alloc;
93
94#[cfg_attr(feature = "web", doc = "- [`web::WebExecutor`] for WASM environments")]
105pub trait Executor: Send + Sync {
106 type Task<T: Send + 'static>: Task<T> + Send;
111
112 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
117 where
118 Fut: Future<Output: Send> + Send + 'static;
119}
120
121impl<E: Executor> Executor for &E {
122 type Task<T: Send + 'static> = E::Task<T>;
123
124 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
125 where
126 Fut: Future<Output: Send> + Send + 'static,
127 {
128 (*self).spawn(fut)
129 }
130}
131
132impl<E: Executor> Executor for &mut E {
133 type Task<T: Send + 'static> = E::Task<T>;
134
135 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
136 where
137 Fut: Future<Output: Send> + Send + 'static,
138 {
139 (**self).spawn(fut)
140 }
141}
142
143impl<E: Executor> Executor for Box<E> {
144 type Task<T: Send + 'static> = E::Task<T>;
145
146 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
147 where
148 Fut: Future<Output: Send> + Send + 'static,
149 {
150 (**self).spawn(fut)
151 }
152}
153
154impl<E: Executor> Executor for alloc::sync::Arc<E> {
155 type Task<T: Send + 'static> = E::Task<T>;
156
157 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
158 where
159 Fut: Future<Output: Send> + Send + 'static,
160 {
161 (**self).spawn(fut)
162 }
163}
164
165#[cfg_attr(feature = "web", doc = "- [`web::WebExecutor`] for WASM environments")]
176pub trait LocalExecutor {
177 type Task<T: 'static>: Task<T>;
182
183 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
188 where
189 Fut: Future + 'static;
190}
191
192impl<E: LocalExecutor> LocalExecutor for &E {
193 type Task<T: 'static> = E::Task<T>;
194
195 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
196 where
197 Fut: Future + 'static,
198 {
199 (*self).spawn_local(fut)
200 }
201}
202
203impl<E: LocalExecutor> LocalExecutor for &mut E {
204 type Task<T: 'static> = E::Task<T>;
205
206 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
207 where
208 Fut: Future + 'static,
209 {
210 (**self).spawn_local(fut)
211 }
212}
213
214impl<E: LocalExecutor> LocalExecutor for Box<E> {
215 type Task<T: 'static> = E::Task<T>;
216
217 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
218 where
219 Fut: Future + 'static,
220 {
221 (**self).spawn_local(fut)
222 }
223}
224
225impl<E: LocalExecutor> LocalExecutor for alloc::rc::Rc<E> {
226 type Task<T: 'static> = E::Task<T>;
227
228 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
229 where
230 Fut: Future + 'static,
231 {
232 (**self).spawn_local(fut)
233 }
234}
235
236impl<E: LocalExecutor> LocalExecutor for alloc::sync::Arc<E> {
237 type Task<T: 'static> = E::Task<T>;
238
239 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
240 where
241 Fut: Future + 'static,
242 {
243 (**self).spawn_local(fut)
244 }
245}
246
247trait AnyLocalExecutorImpl: 'static + Any {
248 fn spawn_local_boxed(
249 &self,
250 fut: Pin<Box<dyn Future<Output = ()>>>,
251 ) -> Pin<Box<dyn Task<()> + 'static>>;
252}
253
254impl<E> AnyLocalExecutorImpl for E
255where
256 E: LocalExecutor + 'static,
257{
258 fn spawn_local_boxed(
259 &self,
260 fut: Pin<Box<dyn Future<Output = ()>>>,
261 ) -> Pin<Box<dyn Task<()> + 'static>> {
262 let task = self.spawn_local(fut);
263 Box::pin(task)
264 }
265}
266
267pub struct AnyLocalExecutor(Box<dyn AnyLocalExecutorImpl>);
272
273impl Debug for AnyLocalExecutor {
274 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
275 f.debug_struct("AnyLocalExecutor").finish()
276 }
277}
278
279impl Debug for AnyExecutor {
280 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
281 f.debug_struct("AnyExecutor").finish()
282 }
283}
284
285impl AnyExecutor {
286 pub fn new(executor: impl Executor + 'static) -> Self {
288 Self(Box::new(executor))
289 }
290
291 pub fn downcast_ref<E: Executor + 'static>(&self) -> Option<&E> {
295 let any: &dyn Any = self.0.as_ref();
296
297 any.downcast_ref()
298 }
299
300 pub fn downcast<E: Executor + 'static>(self) -> Result<Box<E>, Self> {
305 if (&self.0 as &dyn Any).is::<E>() {
306 Ok((self.0 as Box<dyn Any>).downcast().ok().unwrap())
307 } else {
308 Err(self)
309 }
310 }
311}
312
313impl AnyLocalExecutor {
314 pub fn new(executor: impl LocalExecutor + 'static) -> Self {
316 Self(Box::new(executor))
317 }
318
319 pub fn downcast_ref<E: LocalExecutor + 'static>(&self) -> Option<&E> {
323 let any: &dyn Any = self.0.as_ref();
324
325 any.downcast_ref()
326 }
327
328 pub fn downcast<E: LocalExecutor + 'static>(self) -> Result<Box<E>, Self> {
333 if (&self.0 as &dyn Any).is::<E>() {
334 Ok((self.0 as Box<dyn Any>).downcast().ok().unwrap())
335 } else {
336 Err(self)
337 }
338 }
339}
340
341pub struct AnyLocalExecutorTask<T> {
347 inner: Pin<Box<dyn Task<()> + 'static>>,
348 receiver: Receiver<Result<T, Error>>,
349}
350
351impl<T> core::fmt::Debug for AnyLocalExecutorTask<T> {
352 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
353 f.debug_struct("AnyLocalExecutorTask")
354 .finish_non_exhaustive()
355 }
356}
357
358impl<T> Future for AnyLocalExecutorTask<T> {
359 type Output = T;
360
361 fn poll(
362 self: Pin<&mut Self>,
363 cx: &mut core::task::Context<'_>,
364 ) -> core::task::Poll<Self::Output> {
365 self.poll_result(cx).map(|res| res.unwrap())
366 }
367}
368
369impl<T> Task<T> for AnyLocalExecutorTask<T> {
370 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
371 let this = unsafe { self.get_unchecked_mut() };
373 let _ = this.inner.as_mut().poll(cx);
374
375 let mut recv = this.receiver.recv();
377 unsafe {
378 Pin::new_unchecked(&mut recv)
379 .poll(cx)
380 .map(|res| res.unwrap_or_else(|_| Err(Box::new("Channel closed"))))
381 }
382 }
383 fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
384 let this = unsafe { self.get_unchecked_mut() };
385 this.inner.as_mut().poll_cancel(cx)
386 }
387}
388
389impl LocalExecutor for AnyLocalExecutor {
390 type Task<T: 'static> = AnyLocalExecutorTask<T>;
391
392 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
393 where
394 Fut: Future + 'static,
395 {
396 let (sender, receiver) = async_channel::bounded(1);
397 let fut = async move {
398 let res = AssertUnwindSafe(fut).await;
399 let _ = sender.send(Ok(res)).await;
400 };
401 let inner = self.0.spawn_local_boxed(Box::pin(fut));
402 AnyLocalExecutorTask { inner, receiver }
403 }
404}
405
406type Error = Box<dyn core::any::Any + Send>;
410
411pub trait Task<T>: Future<Output = T> {
419 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>>;
424
425 fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
430
431 fn result(self) -> impl Future<Output = Result<T, Error>>
437 where
438 Self: Sized,
439 {
440 ResultFuture {
441 task: self,
442 _phantom: PhantomData,
443 }
444 }
445
446 fn detach(self)
452 where
453 Self: Sized,
454 {
455 core::mem::drop(self);
458 }
459
460 fn cancel(self) -> impl Future<Output = ()>
466 where
467 Self: Sized,
468 {
469 CancelFuture {
470 task: self,
471 _phantom: PhantomData,
472 }
473 }
474}
475
476pub struct ResultFuture<T: Task<U>, U> {
481 task: T,
482 _phantom: PhantomData<U>,
483}
484
485impl<T: Task<U>, U> core::fmt::Debug for ResultFuture<T, U> {
486 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
487 f.debug_struct("ResultFuture").finish_non_exhaustive()
488 }
489}
490
491impl<T: Task<U>, U> Future for ResultFuture<T, U> {
492 type Output = Result<U, Error>;
493
494 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
495 let this = unsafe { self.get_unchecked_mut() };
496 unsafe { Pin::new_unchecked(&mut this.task) }.poll_result(cx)
497 }
498}
499
500pub struct CancelFuture<T: Task<U>, U> {
506 task: T,
507 _phantom: PhantomData<U>,
508}
509
510impl<T: Task<U>, U> core::fmt::Debug for CancelFuture<T, U> {
511 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
512 f.debug_struct("CancelFuture").finish_non_exhaustive()
513 }
514}
515
516impl<T: Task<U>, U> Future for CancelFuture<T, U> {
517 type Output = ();
518
519 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
520 let this = unsafe { self.get_unchecked_mut() };
521 unsafe { Pin::new_unchecked(&mut this.task) }.poll_cancel(cx)
522 }
523}
524
525pub struct AnyExecutor(Box<dyn AnyExecutorImpl>);
531
532pub struct AnyExecutorTask<T> {
537 inner: Pin<Box<dyn Task<()> + Send>>,
538 receiver: Receiver<Result<T, Error>>,
539}
540
541impl<T> core::fmt::Debug for AnyExecutorTask<T> {
542 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
543 f.debug_struct("AnyExecutorTask").finish_non_exhaustive()
544 }
545}
546
547impl<T: Send> Future for AnyExecutorTask<T> {
548 type Output = T;
549
550 fn poll(
551 self: Pin<&mut Self>,
552 cx: &mut core::task::Context<'_>,
553 ) -> core::task::Poll<Self::Output> {
554 self.poll_result(cx).map(|res| res.unwrap())
555 }
556}
557
558impl<T: Send> Task<T> for AnyExecutorTask<T> {
559 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
560 let this = unsafe { self.get_unchecked_mut() };
562 let _ = this.inner.as_mut().poll(cx);
563
564 let mut recv = this.receiver.recv();
566 unsafe {
567 Pin::new_unchecked(&mut recv)
568 .poll(cx)
569 .map(|res| res.unwrap_or_else(|_| Err(Box::new("Channel closed"))))
570 }
571 }
572 fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
573 let this = unsafe { self.get_unchecked_mut() };
574 this.inner.as_mut().poll_cancel(cx)
575 }
576}
577
578impl Executor for AnyExecutor {
579 type Task<T: Send + 'static> = AnyExecutorTask<T>;
580
581 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
582 where
583 Fut: Future<Output: Send> + Send + 'static,
584 {
585 let (sender, receiver) = async_channel::bounded(1);
586 let fut = async move {
587 let res = AssertUnwindSafe(fut).await;
588 let _ = sender.send(Ok(res)).await;
589 };
590 let inner = self.0.spawn_boxed(Box::pin(fut));
591 AnyExecutorTask { inner, receiver }
592 }
593}
594
595trait AnyExecutorImpl: Send + Sync + Any {
596 fn spawn_boxed(
597 &self,
598 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
599 ) -> Pin<Box<dyn Task<()> + Send>>;
600}
601
602impl<T: Task<T>> Task<T> for Pin<Box<T>> {
603 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
604 let this = unsafe { self.get_unchecked_mut() };
605 this.as_mut().poll_result(cx)
606 }
607 fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
608 let this = unsafe { self.get_unchecked_mut() };
609
610 this.as_mut().poll_cancel(cx)
611 }
612}
613
614impl<E> AnyExecutorImpl for E
615where
616 E: Executor + 'static,
617{
618 fn spawn_boxed(
619 &self,
620 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
621 ) -> Pin<Box<dyn Task<()> + Send>> {
622 let task = self.spawn(fut);
623 Box::pin(task)
624 }
625}
626
627#[cfg(feature = "std")]
628mod std_on {
629 use alloc::boxed::Box;
630
631 use crate::{
632 AnyExecutor, AnyExecutorTask, AnyLocalExecutor, AnyLocalExecutorTask, Executor,
633 LocalExecutor,
634 };
635
636 extern crate std;
637
638 use core::{cell::OnceCell, future::Future, panic::AssertUnwindSafe};
639 use std::sync::OnceLock;
640 std::thread_local! {
641 static LOCAL_EXECUTOR: OnceCell<AnyLocalExecutor> = const { OnceCell::new() };
642 }
643
644 pub fn init_local_executor(executor: impl LocalExecutor + 'static) {
654 if try_init_local_executor(executor).is_err() {
655 panic!("Local executor already set for this thread");
656 }
657 }
658
659 pub fn try_init_local_executor<E>(executor: E) -> Result<(), E>
663 where
664 E: LocalExecutor + 'static,
665 {
666 LOCAL_EXECUTOR.with(|cell| {
667 cell.set(AnyLocalExecutor::new(executor))
668 .map_err(|e| *e.downcast().unwrap())
669 })
670 }
671
672 static GLOBAL_EXECUTOR: OnceLock<AnyExecutor> = OnceLock::new();
673
674 pub fn init_global_executor(executor: impl crate::Executor + 'static) {
684 if GLOBAL_EXECUTOR.set(AnyExecutor::new(executor)).is_err() {
685 panic!("Global executor already set");
686 }
687 }
688
689 pub fn try_init_global_executor<E>(executor: E) -> Result<(), E>
693 where
694 E: crate::Executor + 'static,
695 {
696 GLOBAL_EXECUTOR
697 .set(AnyExecutor::new(executor))
698 .map_err(|e| *e.downcast().unwrap())
699 }
700
701 pub fn spawn<Fut>(fut: Fut) -> AnyExecutorTask<Fut::Output>
711 where
712 Fut: Future<Output: Send> + Send + 'static,
713 {
714 let executor = GLOBAL_EXECUTOR.get().expect("Global executor not set");
715 executor.spawn(fut)
716 }
717
718 pub fn spawn_local<Fut>(fut: Fut) -> AnyLocalExecutorTask<Fut::Output>
729 where
730 Fut: Future + 'static,
731 {
732 LOCAL_EXECUTOR.with(|cell| {
733 let executor = cell.get().expect("Local executor not set");
734 executor.spawn_local(fut)
735 })
736 }
737
738 #[allow(unused)]
739 pub(crate) fn catch_unwind<F, R>(f: F) -> Result<R, Box<dyn std::any::Any + Send>>
740 where
741 F: FnOnce() -> R,
742 {
743 std::panic::catch_unwind(AssertUnwindSafe(f))
744 }
745
746 #[derive(Clone, Copy, Debug)]
751 pub struct DefaultExecutor;
752
753 impl Executor for DefaultExecutor {
754 type Task<T: Send + 'static> = AnyExecutorTask<T>;
755
756 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
757 where
758 Fut: core::future::Future<Output: Send> + Send + 'static,
759 {
760 spawn(fut)
761 }
762 }
763
764 impl LocalExecutor for DefaultExecutor {
765 type Task<T: 'static> = AnyLocalExecutorTask<T>;
766
767 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
768 where
769 Fut: core::future::Future + 'static,
770 {
771 spawn_local(fut)
772 }
773 }
774}
775
776#[cfg(feature = "std")]
777pub use std_on::*;