1use std::cell::Cell;
37use std::future::Future;
38use std::mem;
39use std::ops::ControlFlow;
40use std::pin::Pin;
41use std::sync::Arc;
42use std::task::{self, Poll};
43use std::thread;
44
45pub use async_app_macros::main;
55
56#[non_exhaustive]
58pub struct Scope {}
59
60#[derive(Debug)]
62pub struct ForkFailError;
63
64impl Scope {
65 pub fn fork_and_run_event_loop<T: EventLoopDesc>(
66 &mut self,
67 ) -> impl Future<Output = Result<T::ProxyType, ForkFailError>> + Send {
68 self.fork_and_run_event_loop_impl(|| T::build_service_and_proxy())
69 }
70
71 pub fn fork_and_run_event_loop_with_arg<T: EventLoopWithArgsDesc>(
72 &mut self,
73 args: <T as EventLoopWithArgsDesc>::Args,
74 ) -> impl Future<Output = Result<T::ProxyType, ForkFailError>> + Send {
75 self.fork_and_run_event_loop_impl(|| T::build_service_and_proxy_with_args(args))
76 }
77
78 fn fork_and_run_event_loop_impl<R, F>(
79 &mut self,
80 f: F,
81 ) -> impl Future<Output = Result<R, ForkFailError>> + Send
82 where
83 R: Unpin + Send,
84 F: FnOnce() -> (Box<dyn FnOnce()>, R) + Unpin,
85 {
86 enum SpaceLeap<R, F>
87 where
88 F: FnOnce() -> (Box<dyn FnOnce()>, R),
89 {
90 Preparation(Option<fragile::Fragile<F>>),
91 Launched {
92 launching_thread: thread::ThreadId,
93 proxy_value: R,
94 },
95 Landed,
96 Aborted,
97 }
98
99 impl<R, F> Future for SpaceLeap<R, F>
100 where
101 R: Unpin,
102 F: FnOnce() -> (Box<dyn FnOnce()>, R) + Unpin,
103 {
104 type Output = Result<R, ForkFailError>;
105
106 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
107 let this = self.get_mut();
108 match this {
109 SpaceLeap::Preparation(f) => {
110 let (service, proxy) = f
111 .take()
112 .expect("Try to take builder function multiple times")
113 .into_inner()();
114 EVENT_LOOP_CHOICE.set(EventLoopChoice::ForkAndRunEventLoop(service));
115 *this = SpaceLeap::Launched {
116 launching_thread: thread::current().id(),
117 proxy_value: proxy,
118 };
119 Poll::Pending
120 }
121 SpaceLeap::Launched {
122 launching_thread, ..
123 } => {
124 let current_thread_id = thread::current().id();
125 if current_thread_id == *launching_thread {
126 if let Some(_failed) =
128 EVENT_LOOP_CHOICE.take_filter(|cur_choice| match cur_choice {
129 EventLoopChoice::ForkFailureAndNeedRepoll => {
130 ControlFlow::Break(())
131 }
132 _ => ControlFlow::Continue(cur_choice),
133 })
134 {
135 *this = SpaceLeap::Aborted;
136 cx.waker().wake_by_ref();
137 return Poll::Ready(Err(ForkFailError));
138 }
139 return Poll::Pending;
140 }
141
142 let old_value = mem::replace(this, SpaceLeap::Landed {});
143 let SpaceLeap::Launched {
144 proxy_value,
145 launching_thread: _,
146 } = old_value
147 else {
148 unreachable!()
149 };
150 cx.waker().wake_by_ref();
151 Poll::Ready(Ok(proxy_value))
152 }
153 SpaceLeap::Landed => unreachable!(),
154 SpaceLeap::Aborted => unreachable!(),
155 }
156 }
157 }
158 SpaceLeap::<R, F>::Preparation(Some(fragile::Fragile::new(f)))
159 }
160}
161
162pub trait EventLoopDesc {
164 type ProxyType: Unpin + Send;
165
166 fn build_service_and_proxy() -> (Box<dyn FnOnce()>, Self::ProxyType);
167}
168
169pub trait EventLoopWithArgsDesc {
171 type Args: Unpin;
172 type ProxyType: Unpin + Send;
173
174 fn build_service_and_proxy_with_args(args: Self::Args) -> (Box<dyn FnOnce()>, Self::ProxyType);
175}
176
177impl<T> EventLoopDesc for T
178where
179 T: EventLoopWithArgsDesc,
180 <T as EventLoopWithArgsDesc>::Args: Default,
181{
182 type ProxyType = <T as EventLoopWithArgsDesc>::ProxyType;
183 fn build_service_and_proxy() -> (Box<dyn FnOnce()>, Self::ProxyType) {
184 T::build_service_and_proxy_with_args(Default::default())
185 }
186}
187
188thread_local! {
189 static EVENT_LOOP_CHOICE: Cell<EventLoopChoice> =
190 const { Cell::new(EventLoopChoice::Repoll) };
191}
192
193#[derive(Default)]
194enum EventLoopChoice {
195 #[default]
196 Repoll,
197 ForkAndRunEventLoop(Box<dyn FnOnce()>),
198 #[allow(dead_code)]
199 ForkFailureAndNeedRepoll,
200}
201
202#[doc(hidden)]
203pub fn entryscope() -> Scope {
204 Scope {}
205}
206
207struct UnparkWaker(thread::Thread);
208
209impl task::Wake for UnparkWaker {
210 fn wake(self: std::sync::Arc<Self>) {
211 self.0.unpark();
212 }
213
214 fn wake_by_ref(self: &std::sync::Arc<Self>) {
215 self.0.unpark();
216 }
217}
218
219#[doc(hidden)]
220pub fn entrypoint<R: Send + 'static>(
221 mut main_future: Pin<Box<impl Future<Output = R> + Send + 'static>>,
222) -> R {
223 let waker = task::Waker::from(Arc::new(UnparkWaker(thread::current())));
224 let mut ctx = task::Context::from_waker(&waker);
225 let event_loop_to_run = loop {
226 if let Poll::Ready(r) = main_future.as_mut().poll(&mut ctx) {
227 return r;
228 }
229
230 let Some(event_loop_to_run) =
231 EVENT_LOOP_CHOICE.take_filter(|cur_choice| match cur_choice {
232 EventLoopChoice::Repoll | EventLoopChoice::ForkFailureAndNeedRepoll => {
233 ControlFlow::Continue(cur_choice)
234 }
235 EventLoopChoice::ForkAndRunEventLoop(f) => ControlFlow::Break(f),
236 })
237 else {
238 thread::park();
239 continue;
240 };
241
242 break event_loop_to_run;
243 };
244
245 let waker = waker.clone();
247 let new_main_thread_join_handle = thread::Builder::new()
248 .spawn(move || {
249 waker.wake_by_ref();
250 entrypoint(main_future)
251 })
252 .expect("Failed to create thread");
253
254 event_loop_to_run();
255
256 new_main_thread_join_handle
257 .join()
258 .expect("Failed to join created thread")
259}
260
261use utils::TakeFilter;
262
263mod utils {
264 use std::cell::Cell;
265 use std::{ops::ControlFlow, thread};
266
267 pub(crate) trait TakeFilter<T> {
268 fn take_filter<R, F>(&'static self, f: F) -> Option<R>
269 where
270 F: FnOnce(T) -> ControlFlow<R, T>;
271 }
272
273 impl<T> TakeFilter<T> for thread::LocalKey<Cell<T>>
274 where
275 T: Default,
276 {
277 fn take_filter<R, F>(&'static self, f: F) -> Option<R>
278 where
279 F: FnOnce(T) -> ControlFlow<R, T>,
280 {
281 let v = f(self.take());
282 match v {
283 ControlFlow::Continue(v) => {
284 self.set(v);
285 None
286 }
287 ControlFlow::Break(r) => Some(r),
288 }
289 }
290 }
291}