1#![no_std]
37#![cfg_attr(docsrs, feature(doc_cfg))]
38#![warn(missing_docs, missing_debug_implementations)]
39
40#[cfg(feature = "async-task")]
41#[cfg_attr(docsrs, doc(cfg(feature = "async-task")))]
42pub mod async_task;
43
44#[cfg(feature = "async-executor")]
45#[cfg_attr(docsrs, doc(cfg(feature = "async-executor")))]
46pub mod async_executor;
47
48#[cfg(feature = "tokio")]
49#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
50pub mod tokio;
51
52use core::{
53 any::Any,
54 fmt::Debug,
55 future::{Future, poll_fn},
56 marker::PhantomData,
57 panic::AssertUnwindSafe,
58 pin::Pin,
59 task::{Context, Poll},
60};
61
62pub mod mailbox;
63
64use alloc::boxed::Box;
65use async_channel::Receiver;
66
67extern crate alloc;
68
69pub trait Executor: Send + Sync {
80 type Task<T: Send + 'static>: Task<T> + Send;
85
86 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
91 where
92 Fut: Future<Output: Send> + Send + 'static;
93}
94
95impl<E: Executor> Executor for &E {
96 type Task<T: Send + 'static> = E::Task<T>;
97
98 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
99 where
100 Fut: Future<Output: Send> + Send + 'static,
101 {
102 (*self).spawn(fut)
103 }
104}
105
106impl<E: Executor> Executor for &mut E {
107 type Task<T: Send + 'static> = E::Task<T>;
108
109 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
110 where
111 Fut: Future<Output: Send> + Send + 'static,
112 {
113 (**self).spawn(fut)
114 }
115}
116
117impl<E: Executor> Executor for Box<E> {
118 type Task<T: Send + 'static> = E::Task<T>;
119
120 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
121 where
122 Fut: Future<Output: Send> + Send + 'static,
123 {
124 (**self).spawn(fut)
125 }
126}
127
128impl<E: Executor> Executor for alloc::sync::Arc<E> {
129 type Task<T: Send + 'static> = E::Task<T>;
130
131 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
132 where
133 Fut: Future<Output: Send> + Send + 'static,
134 {
135 (**self).spawn(fut)
136 }
137}
138
139pub trait LocalExecutor {
150 type Task<T: 'static>: Task<T>;
155
156 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
161 where
162 Fut: Future + 'static;
163}
164
165impl<E: LocalExecutor> LocalExecutor for &E {
166 type Task<T: 'static> = E::Task<T>;
167
168 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
169 where
170 Fut: Future + 'static,
171 {
172 (*self).spawn_local(fut)
173 }
174}
175
176impl<E: LocalExecutor> LocalExecutor for &mut E {
177 type Task<T: 'static> = E::Task<T>;
178
179 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
180 where
181 Fut: Future + 'static,
182 {
183 (**self).spawn_local(fut)
184 }
185}
186
187impl<E: LocalExecutor> LocalExecutor for Box<E> {
188 type Task<T: 'static> = E::Task<T>;
189
190 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
191 where
192 Fut: Future + 'static,
193 {
194 (**self).spawn_local(fut)
195 }
196}
197
198impl<E: LocalExecutor> LocalExecutor for alloc::rc::Rc<E> {
199 type Task<T: 'static> = E::Task<T>;
200
201 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
202 where
203 Fut: Future + 'static,
204 {
205 (**self).spawn_local(fut)
206 }
207}
208
209impl<E: LocalExecutor> LocalExecutor for alloc::sync::Arc<E> {
210 type Task<T: 'static> = E::Task<T>;
211
212 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
213 where
214 Fut: Future + 'static,
215 {
216 (**self).spawn_local(fut)
217 }
218}
219
220trait AnyLocalExecutorImpl: 'static + Any {
221 fn spawn_local_boxed(
222 &self,
223 fut: Pin<Box<dyn Future<Output = ()>>>,
224 ) -> Pin<Box<dyn Task<()> + 'static>>;
225 fn as_any(&self) -> &dyn Any;
226}
227
228impl<E> AnyLocalExecutorImpl for E
229where
230 E: LocalExecutor + 'static,
231{
232 fn spawn_local_boxed(
233 &self,
234 fut: Pin<Box<dyn Future<Output = ()>>>,
235 ) -> Pin<Box<dyn Task<()> + 'static>> {
236 let task = self.spawn_local(fut);
237 Box::pin(task)
238 }
239
240 fn as_any(&self) -> &dyn Any {
241 self
242 }
243}
244
245pub struct AnyLocalExecutor(Box<dyn AnyLocalExecutorImpl>);
250
251impl Debug for AnyLocalExecutor {
252 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
253 f.debug_struct("AnyLocalExecutor").finish()
254 }
255}
256
257impl Debug for AnyExecutor {
258 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
259 f.debug_struct("AnyExecutor").finish()
260 }
261}
262
263impl<T> dyn Task<T> {
264 pub async fn result(self: Box<Self>) -> Result<T, Error> {
269 let mut pinned: Pin<Box<Self>> = self.into();
270
271 poll_fn(move |cx| pinned.as_mut().poll_result(cx)).await
272 }
273}
274
275impl AnyExecutor {
276 pub fn new(executor: impl Executor + 'static) -> Self {
278 Self(Box::new(executor))
279 }
280
281 pub fn downcast_ref<E: Executor + 'static>(&self) -> Option<&E> {
285 self.0.as_any().downcast_ref()
286 }
287
288 pub fn downcast<E: Executor + 'static>(self) -> Result<Box<E>, Self> {
293 if let Some(executor) = self.0.as_any().downcast_ref::<E>() {
294 let ptr: *const E = executor;
295 let boxed: Box<E> = unsafe { Box::from_raw(ptr as *mut E) };
297 core::mem::forget(self);
298 Ok(boxed)
299 } else {
300 Err(self)
301 }
302 }
303}
304
305impl AnyLocalExecutor {
306 pub fn new(executor: impl LocalExecutor + 'static) -> Self {
308 Self(Box::new(executor))
309 }
310
311 pub fn downcast_ref<E: LocalExecutor + 'static>(&self) -> Option<&E> {
315 self.0.as_any().downcast_ref()
316 }
317
318 pub fn downcast<E: LocalExecutor + 'static>(self) -> Result<Box<E>, Self> {
323 if let Some(executor) = self.0.as_any().downcast_ref::<E>() {
324 let ptr: *const E = executor;
325 let boxed: Box<E> = unsafe { Box::from_raw(ptr as *mut E) };
327 core::mem::forget(self);
328 Ok(boxed)
329 } else {
330 Err(self)
331 }
332 }
333}
334
335pub struct AnyLocalExecutorTask<T> {
341 inner: Pin<Box<dyn Task<()> + 'static>>,
342 receiver: Receiver<Result<T, Error>>,
343}
344
345impl<T> AnyLocalExecutorTask<T> {
346 fn new(inner: Pin<Box<dyn Task<()> + 'static>>, receiver: Receiver<Result<T, Error>>) -> Self {
348 Self { inner, receiver }
349 }
350
351 pub async fn result(self) -> Result<T, Error> {
353 <Self as Task<T>>::result(self).await
354 }
355
356 pub fn detach(self) {
358 <Self as Task<T>>::detach(self)
359 }
360}
361
362impl<T> core::fmt::Debug for AnyLocalExecutorTask<T> {
363 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
364 f.debug_struct("AnyLocalExecutorTask")
365 .finish_non_exhaustive()
366 }
367}
368
369impl<T> Future for AnyLocalExecutorTask<T> {
370 type Output = T;
371
372 fn poll(
373 self: Pin<&mut Self>,
374 cx: &mut core::task::Context<'_>,
375 ) -> core::task::Poll<Self::Output> {
376 self.poll_result(cx).map(|res| res.unwrap())
377 }
378}
379
380impl<T> Task<T> for AnyLocalExecutorTask<T> {
381 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
382 let this = unsafe { self.get_unchecked_mut() };
384 let _ = this.inner.as_mut().poll(cx);
385
386 let mut recv = this.receiver.recv();
388 unsafe {
389 Pin::new_unchecked(&mut recv)
390 .poll(cx)
391 .map(|res| res.unwrap_or_else(|_| Err(Box::new("Channel closed"))))
392 }
393 }
394}
395
396impl LocalExecutor for AnyLocalExecutor {
397 type Task<T: 'static> = AnyLocalExecutorTask<T>;
398
399 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
400 where
401 Fut: Future + 'static,
402 {
403 let (sender, receiver) = async_channel::bounded(1);
404 let fut = async move {
405 let res = AssertUnwindSafe(fut).await;
406 let _ = sender.send(Ok(res)).await;
407 };
408 let inner = self.0.spawn_local_boxed(Box::pin(fut));
409 AnyLocalExecutorTask::new(inner, receiver)
410 }
411}
412
413type Error = Box<dyn core::any::Any + Send>;
417
418pub trait Task<T>: Future<Output = T> {
426 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>>;
431
432 fn result(self) -> impl Future<Output = Result<T, Error>>
438 where
439 Self: Sized,
440 {
441 ResultFuture {
442 task: self,
443 _phantom: PhantomData,
444 }
445 }
446
447 fn detach(self)
453 where
454 Self: Sized,
455 {
456 core::mem::forget(self);
457 }
458}
459
460pub struct ResultFuture<T: Task<U>, U> {
465 task: T,
466 _phantom: PhantomData<U>,
467}
468
469impl<T: Task<U>, U> core::fmt::Debug for ResultFuture<T, U> {
470 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
471 f.debug_struct("ResultFuture").finish_non_exhaustive()
472 }
473}
474
475impl<T: Task<U>, U> Future for ResultFuture<T, U> {
476 type Output = Result<U, Error>;
477
478 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
479 let this = unsafe { self.get_unchecked_mut() };
480 unsafe { Pin::new_unchecked(&mut this.task) }.poll_result(cx)
481 }
482}
483
484pub struct AnyExecutor(Box<dyn AnyExecutorImpl>);
490
491pub struct AnyExecutorTask<T> {
496 inner: Pin<Box<dyn Task<()> + Send>>,
497 receiver: Receiver<Result<T, Error>>,
498}
499
500impl<T: Send> AnyExecutorTask<T> {
501 fn new(inner: Pin<Box<dyn Task<()> + Send>>, receiver: Receiver<Result<T, Error>>) -> Self {
503 Self { inner, receiver }
504 }
505
506 pub async fn result(self) -> Result<T, Error> {
511 <Self as Task<T>>::result(self).await
512 }
513
514 pub fn detach(self) {
516 <Self as Task<T>>::detach(self)
517 }
518}
519
520impl<T> core::fmt::Debug for AnyExecutorTask<T> {
521 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
522 f.debug_struct("AnyExecutorTask").finish_non_exhaustive()
523 }
524}
525
526impl<T: Send> Future for AnyExecutorTask<T> {
527 type Output = T;
528
529 fn poll(
530 self: Pin<&mut Self>,
531 cx: &mut core::task::Context<'_>,
532 ) -> core::task::Poll<Self::Output> {
533 self.poll_result(cx).map(|res| res.unwrap())
534 }
535}
536
537impl<T: Send> Task<T> for AnyExecutorTask<T> {
538 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
539 let this = unsafe { self.get_unchecked_mut() };
541 let _ = this.inner.as_mut().poll(cx);
542
543 let mut recv = this.receiver.recv();
545 unsafe {
546 Pin::new_unchecked(&mut recv)
547 .poll(cx)
548 .map(|res| res.unwrap_or_else(|_| Err(Box::new("Channel closed"))))
549 }
550 }
551}
552
553impl Executor for AnyExecutor {
554 type Task<T: Send + 'static> = AnyExecutorTask<T>;
555
556 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
557 where
558 Fut: Future<Output: Send> + Send + 'static,
559 {
560 let (sender, receiver) = async_channel::bounded(1);
561 let fut = async move {
562 let res = AssertUnwindSafe(fut).await;
563 let _ = sender.send(Ok(res)).await;
564 };
565 let inner = self.0.spawn_boxed(Box::pin(fut));
566 AnyExecutorTask::new(inner, receiver)
567 }
568}
569
570trait AnyExecutorImpl: Send + Sync + Any {
571 fn spawn_boxed(
572 &self,
573 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
574 ) -> Pin<Box<dyn Task<()> + Send>>;
575 fn as_any(&self) -> &dyn Any;
576}
577
578impl<T: Task<T>> Task<T> for Pin<Box<T>> {
579 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
580 let this = unsafe { self.get_unchecked_mut() };
581 this.as_mut().poll_result(cx)
582 }
583}
584
585impl<E> AnyExecutorImpl for E
586where
587 E: Executor + 'static,
588{
589 fn spawn_boxed(
590 &self,
591 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
592 ) -> Pin<Box<dyn Task<()> + Send>> {
593 let task = self.spawn(fut);
594 Box::pin(task)
595 }
596
597 fn as_any(&self) -> &dyn Any {
598 self
599 }
600}
601
602#[cfg(feature = "std")]
603mod std_on {
604 use alloc::boxed::Box;
605
606 use crate::{
607 AnyExecutor, AnyExecutorTask, AnyLocalExecutor, AnyLocalExecutorTask, Executor,
608 LocalExecutor,
609 };
610
611 extern crate std;
612
613 use core::{cell::OnceCell, future::Future, panic::AssertUnwindSafe};
614 use std::sync::OnceLock;
615 std::thread_local! {
616 static LOCAL_EXECUTOR: OnceCell<AnyLocalExecutor> = const { OnceCell::new() };
617 }
618
619 pub fn init_local_executor(executor: impl LocalExecutor + 'static) {
629 if try_init_local_executor(executor).is_err() {
630 panic!("Local executor already set for this thread");
631 }
632 }
633
634 pub fn try_init_local_executor<E>(executor: E) -> Result<(), E>
638 where
639 E: LocalExecutor + 'static,
640 {
641 LOCAL_EXECUTOR.with(|cell| {
642 cell.set(AnyLocalExecutor::new(executor))
643 .map_err(|e| *e.downcast().unwrap())
644 })
645 }
646
647 static GLOBAL_EXECUTOR: OnceLock<AnyExecutor> = OnceLock::new();
648
649 pub fn init_global_executor(executor: impl crate::Executor + 'static) {
659 if GLOBAL_EXECUTOR.set(AnyExecutor::new(executor)).is_err() {
660 panic!("Global executor already set");
661 }
662 }
663
664 pub fn try_init_global_executor<E>(executor: E) -> Result<(), E>
668 where
669 E: crate::Executor + 'static,
670 {
671 GLOBAL_EXECUTOR
672 .set(AnyExecutor::new(executor))
673 .map_err(|e| *e.downcast().unwrap())
674 }
675
676 pub fn spawn<Fut>(fut: Fut) -> AnyExecutorTask<Fut::Output>
686 where
687 Fut: Future<Output: Send> + Send + 'static,
688 {
689 let executor = GLOBAL_EXECUTOR.get().expect("Global executor not set");
690 executor.spawn(fut)
691 }
692
693 pub fn spawn_local<Fut>(fut: Fut) -> AnyLocalExecutorTask<Fut::Output>
704 where
705 Fut: Future + 'static,
706 {
707 LOCAL_EXECUTOR.with(|cell| {
708 let executor = cell.get().expect("Local executor not set");
709 executor.spawn_local(fut)
710 })
711 }
712
713 #[allow(unused)]
714 pub(crate) fn catch_unwind<F, R>(f: F) -> Result<R, Box<dyn std::any::Any + Send>>
715 where
716 F: FnOnce() -> R,
717 {
718 std::panic::catch_unwind(AssertUnwindSafe(f))
719 }
720
721 #[derive(Clone, Copy, Debug)]
726 pub struct DefaultExecutor;
727
728 impl Executor for DefaultExecutor {
729 type Task<T: Send + 'static> = AnyExecutorTask<T>;
730
731 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
732 where
733 Fut: core::future::Future<Output: Send> + Send + 'static,
734 {
735 spawn(fut)
736 }
737 }
738
739 impl LocalExecutor for DefaultExecutor {
740 type Task<T: 'static> = AnyLocalExecutorTask<T>;
741
742 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
743 where
744 Fut: core::future::Future + 'static,
745 {
746 spawn_local(fut)
747 }
748 }
749}
750
751#[cfg(feature = "std")]
752pub use std_on::*;