1#![no_std]
37#![cfg_attr(docsrs, feature(doc_cfg))]
38#![warn(missing_docs, missing_debug_implementations)]
39
40#[cfg(feature = "async-task")]
41#[cfg_attr(docsrs, doc(cfg(feature = "async-task")))]
42pub mod async_task;
43
44#[cfg(feature = "async-executor")]
45#[cfg_attr(docsrs, doc(cfg(feature = "async-executor")))]
46pub mod async_executor;
47
48#[cfg(feature = "tokio")]
49#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
50pub mod tokio;
51
52#[cfg(feature = "smol")]
53#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
54pub mod smol;
55
56use core::{
57 any::Any,
58 fmt::Debug,
59 future::{Future, poll_fn},
60 marker::PhantomData,
61 panic::AssertUnwindSafe,
62 pin::Pin,
63 task::{Context, Poll},
64};
65
66pub mod mailbox;
67use alloc::boxed::Box;
68use async_channel::Receiver;
69
70extern crate alloc;
71
72pub trait Executor: Send + Sync {
83 type Task<T: Send + 'static>: Task<T> + Send;
88
89 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
94 where
95 Fut: Future<Output: Send> + Send + 'static;
96}
97
98impl<E: Executor> Executor for &E {
99 type Task<T: Send + 'static> = E::Task<T>;
100
101 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
102 where
103 Fut: Future<Output: Send> + Send + 'static,
104 {
105 (*self).spawn(fut)
106 }
107}
108
109impl<E: Executor> Executor for &mut E {
110 type Task<T: Send + 'static> = E::Task<T>;
111
112 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
113 where
114 Fut: Future<Output: Send> + Send + 'static,
115 {
116 (**self).spawn(fut)
117 }
118}
119
120impl<E: Executor> Executor for Box<E> {
121 type Task<T: Send + 'static> = E::Task<T>;
122
123 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
124 where
125 Fut: Future<Output: Send> + Send + 'static,
126 {
127 (**self).spawn(fut)
128 }
129}
130
131impl<E: Executor> Executor for alloc::sync::Arc<E> {
132 type Task<T: Send + 'static> = E::Task<T>;
133
134 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
135 where
136 Fut: Future<Output: Send> + Send + 'static,
137 {
138 (**self).spawn(fut)
139 }
140}
141
142pub trait LocalExecutor {
153 type Task<T: 'static>: Task<T>;
158
159 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
164 where
165 Fut: Future + 'static;
166}
167
168impl<E: LocalExecutor> LocalExecutor for &E {
169 type Task<T: 'static> = E::Task<T>;
170
171 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
172 where
173 Fut: Future + 'static,
174 {
175 (*self).spawn_local(fut)
176 }
177}
178
179impl<E: LocalExecutor> LocalExecutor for &mut E {
180 type Task<T: 'static> = E::Task<T>;
181
182 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
183 where
184 Fut: Future + 'static,
185 {
186 (**self).spawn_local(fut)
187 }
188}
189
190impl<E: LocalExecutor> LocalExecutor for Box<E> {
191 type Task<T: 'static> = E::Task<T>;
192
193 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
194 where
195 Fut: Future + 'static,
196 {
197 (**self).spawn_local(fut)
198 }
199}
200
201impl<E: LocalExecutor> LocalExecutor for alloc::rc::Rc<E> {
202 type Task<T: 'static> = E::Task<T>;
203
204 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
205 where
206 Fut: Future + 'static,
207 {
208 (**self).spawn_local(fut)
209 }
210}
211
212impl<E: LocalExecutor> LocalExecutor for alloc::sync::Arc<E> {
213 type Task<T: 'static> = E::Task<T>;
214
215 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
216 where
217 Fut: Future + 'static,
218 {
219 (**self).spawn_local(fut)
220 }
221}
222
223trait AnyLocalExecutorImpl: 'static + Any {
224 fn spawn_local_boxed(
225 &self,
226 fut: Pin<Box<dyn Future<Output = ()>>>,
227 ) -> Pin<Box<dyn Task<()> + 'static>>;
228 fn as_any(&self) -> &dyn Any;
229}
230
231impl<E> AnyLocalExecutorImpl for E
232where
233 E: LocalExecutor + 'static,
234{
235 fn spawn_local_boxed(
236 &self,
237 fut: Pin<Box<dyn Future<Output = ()>>>,
238 ) -> Pin<Box<dyn Task<()> + 'static>> {
239 let task = self.spawn_local(fut);
240 Box::pin(task)
241 }
242
243 fn as_any(&self) -> &dyn Any {
244 self
245 }
246}
247
248pub struct AnyLocalExecutor(Box<dyn AnyLocalExecutorImpl>);
253
254impl Debug for AnyLocalExecutor {
255 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
256 f.debug_struct("AnyLocalExecutor").finish()
257 }
258}
259
260impl Debug for AnyExecutor {
261 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
262 f.debug_struct("AnyExecutor").finish()
263 }
264}
265
266impl<T> dyn Task<T> {
267 pub async fn result(self: Box<Self>) -> Result<T, Error> {
272 let mut pinned: Pin<Box<Self>> = self.into();
273
274 poll_fn(move |cx| pinned.as_mut().poll_result(cx)).await
275 }
276}
277
278impl AnyExecutor {
279 pub fn new(executor: impl Executor + 'static) -> Self {
281 Self(Box::new(executor))
282 }
283
284 pub fn downcast_ref<E: Executor + 'static>(&self) -> Option<&E> {
288 self.0.as_any().downcast_ref()
289 }
290
291 pub fn downcast<E: Executor + 'static>(self) -> Result<Box<E>, Self> {
296 if let Some(executor) = self.0.as_any().downcast_ref::<E>() {
297 let ptr: *const E = executor;
298 let boxed: Box<E> = unsafe { Box::from_raw(ptr as *mut E) };
300 core::mem::forget(self);
301 Ok(boxed)
302 } else {
303 Err(self)
304 }
305 }
306}
307
308impl AnyLocalExecutor {
309 pub fn new(executor: impl LocalExecutor + 'static) -> Self {
311 Self(Box::new(executor))
312 }
313
314 pub fn downcast_ref<E: LocalExecutor + 'static>(&self) -> Option<&E> {
318 self.0.as_any().downcast_ref()
319 }
320
321 pub fn downcast<E: LocalExecutor + 'static>(self) -> Result<Box<E>, Self> {
326 if let Some(executor) = self.0.as_any().downcast_ref::<E>() {
327 let ptr: *const E = executor;
328 let boxed: Box<E> = unsafe { Box::from_raw(ptr as *mut E) };
330 core::mem::forget(self);
331 Ok(boxed)
332 } else {
333 Err(self)
334 }
335 }
336}
337
338pub struct AnyLocalExecutorTask<T> {
344 inner: Pin<Box<dyn Task<()> + 'static>>,
345 receiver: Receiver<Result<T, Error>>,
346}
347
348impl<T> AnyLocalExecutorTask<T> {
349 fn new(inner: Pin<Box<dyn Task<()> + 'static>>, receiver: Receiver<Result<T, Error>>) -> Self {
351 Self { inner, receiver }
352 }
353
354 pub async fn result(self) -> Result<T, Error> {
356 <Self as Task<T>>::result(self).await
357 }
358
359 pub fn detach(self) {
361 <Self as Task<T>>::detach(self)
362 }
363}
364
365impl<T> core::fmt::Debug for AnyLocalExecutorTask<T> {
366 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
367 f.debug_struct("AnyLocalExecutorTask")
368 .finish_non_exhaustive()
369 }
370}
371
372impl<T> Future for AnyLocalExecutorTask<T> {
373 type Output = T;
374
375 fn poll(
376 self: Pin<&mut Self>,
377 cx: &mut core::task::Context<'_>,
378 ) -> core::task::Poll<Self::Output> {
379 self.poll_result(cx).map(|res| res.unwrap())
380 }
381}
382
383impl<T> Task<T> for AnyLocalExecutorTask<T> {
384 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
385 let this = unsafe { self.get_unchecked_mut() };
387 let _ = this.inner.as_mut().poll(cx);
388
389 let mut recv = this.receiver.recv();
391 unsafe {
392 Pin::new_unchecked(&mut recv)
393 .poll(cx)
394 .map(|res| res.unwrap_or_else(|_| Err(Box::new("Channel closed"))))
395 }
396 }
397}
398
399impl LocalExecutor for AnyLocalExecutor {
400 type Task<T: 'static> = AnyLocalExecutorTask<T>;
401
402 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
403 where
404 Fut: Future + 'static,
405 {
406 let (sender, receiver) = async_channel::bounded(1);
407 let fut = async move {
408 let res = AssertUnwindSafe(fut).await;
409 let _ = sender.send(Ok(res)).await;
410 };
411 let inner = self.0.spawn_local_boxed(Box::pin(fut));
412 AnyLocalExecutorTask::new(inner, receiver)
413 }
414}
415
416type Error = Box<dyn core::any::Any + Send>;
420
421pub trait Task<T>: Future<Output = T> {
429 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>>;
434
435 fn result(self) -> impl Future<Output = Result<T, Error>>
441 where
442 Self: Sized,
443 {
444 ResultFuture {
445 task: self,
446 _phantom: PhantomData,
447 }
448 }
449
450 fn detach(self)
456 where
457 Self: Sized,
458 {
459 core::mem::forget(self);
460 }
461}
462
463pub struct ResultFuture<T: Task<U>, U> {
468 task: T,
469 _phantom: PhantomData<U>,
470}
471
472impl<T: Task<U>, U> core::fmt::Debug for ResultFuture<T, U> {
473 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
474 f.debug_struct("ResultFuture").finish_non_exhaustive()
475 }
476}
477
478impl<T: Task<U>, U> Future for ResultFuture<T, U> {
479 type Output = Result<U, Error>;
480
481 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
482 let this = unsafe { self.get_unchecked_mut() };
483 unsafe { Pin::new_unchecked(&mut this.task) }.poll_result(cx)
484 }
485}
486
487pub struct AnyExecutor(Box<dyn AnyExecutorImpl>);
493
494pub struct AnyExecutorTask<T> {
499 inner: Pin<Box<dyn Task<()> + Send>>,
500 receiver: Receiver<Result<T, Error>>,
501}
502
503impl<T: Send> AnyExecutorTask<T> {
504 fn new(inner: Pin<Box<dyn Task<()> + Send>>, receiver: Receiver<Result<T, Error>>) -> Self {
506 Self { inner, receiver }
507 }
508
509 pub async fn result(self) -> Result<T, Error> {
514 <Self as Task<T>>::result(self).await
515 }
516
517 pub fn detach(self) {
519 <Self as Task<T>>::detach(self)
520 }
521}
522
523impl<T> core::fmt::Debug for AnyExecutorTask<T> {
524 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
525 f.debug_struct("AnyExecutorTask").finish_non_exhaustive()
526 }
527}
528
529impl<T: Send> Future for AnyExecutorTask<T> {
530 type Output = T;
531
532 fn poll(
533 self: Pin<&mut Self>,
534 cx: &mut core::task::Context<'_>,
535 ) -> core::task::Poll<Self::Output> {
536 self.poll_result(cx).map(|res| res.unwrap())
537 }
538}
539
540impl<T: Send> Task<T> for AnyExecutorTask<T> {
541 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
542 let this = unsafe { self.get_unchecked_mut() };
544 let _ = this.inner.as_mut().poll(cx);
545
546 let mut recv = this.receiver.recv();
548 unsafe {
549 Pin::new_unchecked(&mut recv)
550 .poll(cx)
551 .map(|res| res.unwrap_or_else(|_| Err(Box::new("Channel closed"))))
552 }
553 }
554}
555
556impl Executor for AnyExecutor {
557 type Task<T: Send + 'static> = AnyExecutorTask<T>;
558
559 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
560 where
561 Fut: Future<Output: Send> + Send + 'static,
562 {
563 let (sender, receiver) = async_channel::bounded(1);
564 let fut = async move {
565 let res = AssertUnwindSafe(fut).await;
566 let _ = sender.send(Ok(res)).await;
567 };
568 let inner = self.0.spawn_boxed(Box::pin(fut));
569 AnyExecutorTask::new(inner, receiver)
570 }
571}
572
573trait AnyExecutorImpl: Send + Sync + Any {
574 fn spawn_boxed(
575 &self,
576 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
577 ) -> Pin<Box<dyn Task<()> + Send>>;
578 fn as_any(&self) -> &dyn Any;
579}
580
581impl<T: Task<T>> Task<T> for Pin<Box<T>> {
582 fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
583 let this = unsafe { self.get_unchecked_mut() };
584 this.as_mut().poll_result(cx)
585 }
586}
587
588impl<E> AnyExecutorImpl for E
589where
590 E: Executor + 'static,
591{
592 fn spawn_boxed(
593 &self,
594 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
595 ) -> Pin<Box<dyn Task<()> + Send>> {
596 let task = self.spawn(fut);
597 Box::pin(task)
598 }
599
600 fn as_any(&self) -> &dyn Any {
601 self
602 }
603}
604
605#[cfg(feature = "std")]
606mod std_on {
607 use alloc::boxed::Box;
608
609 use crate::{
610 AnyExecutor, AnyExecutorTask, AnyLocalExecutor, AnyLocalExecutorTask, Executor,
611 LocalExecutor,
612 };
613
614 extern crate std;
615
616 use core::{cell::OnceCell, future::Future, panic::AssertUnwindSafe};
617 use std::sync::OnceLock;
618 std::thread_local! {
619 static LOCAL_EXECUTOR: OnceCell<AnyLocalExecutor> = const { OnceCell::new() };
620 }
621
622 pub fn init_local_executor(executor: impl LocalExecutor + 'static) {
632 if try_init_local_executor(executor).is_err() {
633 panic!("Local executor already set for this thread");
634 }
635 }
636
637 pub fn try_init_local_executor<E>(executor: E) -> Result<(), E>
641 where
642 E: LocalExecutor + 'static,
643 {
644 LOCAL_EXECUTOR.with(|cell| {
645 cell.set(AnyLocalExecutor::new(executor))
646 .map_err(|e| *e.downcast().unwrap())
647 })
648 }
649
650 static GLOBAL_EXECUTOR: OnceLock<AnyExecutor> = OnceLock::new();
651
652 pub fn init_global_executor(executor: impl crate::Executor + 'static) {
662 if GLOBAL_EXECUTOR.set(AnyExecutor::new(executor)).is_err() {
663 panic!("Global executor already set");
664 }
665 }
666
667 pub fn try_init_global_executor<E>(executor: E) -> Result<(), E>
671 where
672 E: crate::Executor + 'static,
673 {
674 GLOBAL_EXECUTOR
675 .set(AnyExecutor::new(executor))
676 .map_err(|e| *e.downcast().unwrap())
677 }
678
679 pub fn spawn<Fut>(fut: Fut) -> AnyExecutorTask<Fut::Output>
689 where
690 Fut: Future<Output: Send> + Send + 'static,
691 {
692 let executor = GLOBAL_EXECUTOR.get().expect("Global executor not set");
693 executor.spawn(fut)
694 }
695
696 pub fn spawn_local<Fut>(fut: Fut) -> AnyLocalExecutorTask<Fut::Output>
707 where
708 Fut: Future + 'static,
709 {
710 LOCAL_EXECUTOR.with(|cell| {
711 let executor = cell.get().expect("Local executor not set");
712 executor.spawn_local(fut)
713 })
714 }
715
716 #[allow(unused)]
717 pub(crate) fn catch_unwind<F, R>(f: F) -> Result<R, Box<dyn std::any::Any + Send>>
718 where
719 F: FnOnce() -> R,
720 {
721 std::panic::catch_unwind(AssertUnwindSafe(f))
722 }
723
724 #[derive(Clone, Copy, Debug)]
729 pub struct DefaultExecutor;
730
731 impl Executor for DefaultExecutor {
732 type Task<T: Send + 'static> = AnyExecutorTask<T>;
733
734 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
735 where
736 Fut: core::future::Future<Output: Send> + Send + 'static,
737 {
738 spawn(fut)
739 }
740 }
741
742 impl LocalExecutor for DefaultExecutor {
743 type Task<T: 'static> = AnyLocalExecutorTask<T>;
744
745 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
746 where
747 Fut: core::future::Future + 'static,
748 {
749 spawn_local(fut)
750 }
751 }
752}
753
754#[cfg(feature = "std")]
755pub use std_on::*;