1#![cfg(target_has_atomic = "ptr")] #![warn(missing_docs)]
6
7use crate::api::EventLoopError;
10use crate::SlintContext;
11use alloc::boxed::Box;
12use alloc::task::Wake;
13use alloc::vec::Vec;
14use core::future::Future;
15use core::ops::DerefMut;
16use core::pin::Pin;
17use core::task::Poll;
18use portable_atomic as atomic;
19
20enum FutureState<T> {
21 Running(Pin<Box<dyn Future<Output = T>>>),
22 Finished(Option<T>),
23}
24
25struct FutureRunnerInner<T> {
26 fut: FutureState<T>,
27 wakers: Vec<core::task::Waker>,
28}
29
30struct FutureRunner<T> {
31 #[cfg(not(feature = "std"))]
32 inner: core::cell::RefCell<FutureRunnerInner<T>>,
33 #[cfg(feature = "std")]
34 inner: std::sync::Mutex<FutureRunnerInner<T>>,
35 aborted: atomic::AtomicBool,
36 proxy: Box<dyn crate::platform::EventLoopProxy>,
37 #[cfg(feature = "std")]
38 thread: std::thread::ThreadId,
39}
40
41impl<T> FutureRunner<T> {
42 fn inner(&self) -> impl DerefMut<Target = FutureRunnerInner<T>> + '_ {
43 #[cfg(feature = "std")]
44 return self.inner.lock().unwrap();
45 #[cfg(not(feature = "std"))]
46 return self.inner.borrow_mut();
47 }
48}
49
50#[allow(unsafe_code)]
56unsafe impl<T> Send for FutureRunner<T> {}
57#[allow(unsafe_code)]
58unsafe impl<T> Sync for FutureRunner<T> {}
59
60impl<T: 'static> Wake for FutureRunner<T> {
61 fn wake(self: alloc::sync::Arc<Self>) {
62 self.clone().proxy.invoke_from_event_loop(Box::new(move || {
63 #[cfg(feature = "std")]
64 assert_eq!(self.thread, std::thread::current().id(), "the future was moved to a thread despite we checked it was created in the event loop thread");
65 let waker = self.clone().into();
66 let mut inner = self.inner();
67 let mut cx = core::task::Context::from_waker(&waker);
68 if let FutureState::Running(fut) = &mut inner.fut {
69 if self.aborted.load(atomic::Ordering::Relaxed) {
70 inner.fut = FutureState::Finished(None);
71 } else {
72 match fut.as_mut().poll(&mut cx) {
73 Poll::Ready(val) => {
74 inner.fut = FutureState::Finished(Some(val));
75 for w in core::mem::take(&mut inner.wakers) {
76 w.wake();
77 }
78 }
79 Poll::Pending => {}
80 }
81 }
82 }
83 }))
84 .expect("No event loop despite we checked");
85 }
86}
87
88pub struct JoinHandle<T>(alloc::sync::Arc<FutureRunner<T>>);
94
95impl<T> Future for JoinHandle<T> {
96 type Output = T;
97
98 fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
99 let mut inner = self.0.inner();
100 match &mut inner.fut {
101 FutureState::Running(_) => {
102 let waker = cx.waker();
103 if !inner.wakers.iter().any(|w| w.will_wake(waker)) {
104 inner.wakers.push(waker.clone());
105 }
106 Poll::Pending
107 }
108 FutureState::Finished(x) => {
109 Poll::Ready(x.take().expect("Polling completed or aborted JoinHandle"))
110 }
111 }
112 }
113}
114
115impl<T> JoinHandle<T> {
116 pub fn abort(self) {
120 self.0.aborted.store(true, atomic::Ordering::Relaxed);
121 }
122 pub fn is_finished(&self) -> bool {
124 matches!(self.0.inner().fut, FutureState::Finished(_))
125 }
126}
127
128#[cfg(feature = "std")]
129#[allow(unsafe_code)]
130unsafe impl<T: Send> Send for JoinHandle<T> {}
132
133pub(crate) fn spawn_local_with_ctx<F: Future + 'static>(
135 ctx: &SlintContext,
136 fut: F,
137) -> Result<JoinHandle<F::Output>, EventLoopError> {
138 let arc = alloc::sync::Arc::new(FutureRunner {
139 #[cfg(feature = "std")]
140 thread: std::thread::current().id(),
141 inner: FutureRunnerInner { fut: FutureState::Running(Box::pin(fut)), wakers: Vec::new() }
142 .into(),
143 aborted: Default::default(),
144 proxy: ctx.event_loop_proxy().ok_or(EventLoopError::NoEventLoopProvider)?,
145 });
146 arc.wake_by_ref();
147 Ok(JoinHandle(arc))
148}