async_app/
lib.rs

1//! This crate provides ergonomic approach to implement applications spawning event loops.
2//!
3//! This covers applications using GUI & TUI (Screen rendering and input responding) /
4//! Network Services / Audio Services and so on.
5//! The `main` logic is implemented as an async function, and spawning event loops is
6//! implemented as `Future`s that after yielded, returns a proxy object that has access to the
7//! event loop. When yielded, the `main` future is also transfered and continue running on a new
8//! thread, leaving the original thread to run the event loop.
9//!
10//! To avoid pulling unnecessary dependencies, this crate doesn't provide any definitions for event
11//! loops, so you usually need to use the definition from another crate, usually with the name `async-app-*`
12//! 
13//! The final code will be like:
14//!
15//! ```rust,ignore
16//! use async_app_myeventloop::MyEventLoop;
17//!
18//! // This macro below expands to some boilerplate to run main future
19//! #[async_app::main]
20//! async fn main(mut scope: async_app::Scope) {
21//!     let proxy = scope
22//!         .fork_and_run_event_loop::<MyEventLoop>()
23//!         .await
24//!         .expect("Failed to spawn event loop");
25//!     // code below runs on the thread #2
26//!     proxy.do_necessary_configuration();
27//!     // thread #1's event loop is now modified with this configuration.
28//!     
29//!     // more application logic here
30//! 
31//!     // dropping `proxy` here will usually tell the event loop to quit somehow.
32//! }
33//!
34//! ```
35
36use std::cell::Cell;
37use std::future::Future;
38use std::mem;
39use std::ops::ControlFlow;
40use std::pin::Pin;
41use std::sync::Arc;
42use std::task::{self, Poll};
43use std::thread;
44
45/// The macro supplying startup boilerplate
46/// 
47/// Use it on async `main` function like this:
48/// ```rust,ignore
49/// #[async_app::main]
50/// async fn main(mut scope: async_app::Scope) {
51///     //...
52/// }
53/// ```
54pub use async_app_macros::main;
55
56/// The scope handle on which one can spawn event loops
57#[non_exhaustive]
58pub struct Scope {}
59
60/// Failing to spawn event loop, which rarely happens
61#[derive(Debug)]
62pub struct ForkFailError;
63
64impl Scope {
65    pub fn fork_and_run_event_loop<T: EventLoopDesc>(
66        &mut self,
67    ) -> impl Future<Output = Result<T::ProxyType, ForkFailError>> + Send {
68        self.fork_and_run_event_loop_impl(|| T::build_service_and_proxy())
69    }
70
71    pub fn fork_and_run_event_loop_with_arg<T: EventLoopWithArgsDesc>(
72        &mut self,
73        args: <T as EventLoopWithArgsDesc>::Args,
74    ) -> impl Future<Output = Result<T::ProxyType, ForkFailError>> + Send {
75        self.fork_and_run_event_loop_impl(|| T::build_service_and_proxy_with_args(args))
76    }
77
78    fn fork_and_run_event_loop_impl<R, F>(
79        &mut self,
80        f: F,
81    ) -> impl Future<Output = Result<R, ForkFailError>> + Send
82    where
83        R: Unpin + Send,
84        F: FnOnce() -> (Box<dyn FnOnce()>, R) + Unpin,
85    {
86        enum SpaceLeap<R, F>
87        where
88            F: FnOnce() -> (Box<dyn FnOnce()>, R),
89        {
90            Preparation(Option<fragile::Fragile<F>>),
91            Launched {
92                launching_thread: thread::ThreadId,
93                proxy_value: R,
94            },
95            Landed,
96            Aborted,
97        }
98
99        impl<R, F> Future for SpaceLeap<R, F>
100        where
101            R: Unpin,
102            F: FnOnce() -> (Box<dyn FnOnce()>, R) + Unpin,
103        {
104            type Output = Result<R, ForkFailError>;
105
106            fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
107                let this = self.get_mut();
108                match this {
109                    SpaceLeap::Preparation(f) => {
110                        let (service, proxy) = f
111                            .take()
112                            .expect("Try to take builder function multiple times")
113                            .into_inner()();
114                        EVENT_LOOP_CHOICE.set(EventLoopChoice::ForkAndRunEventLoop(service));
115                        *this = SpaceLeap::Launched {
116                            launching_thread: thread::current().id(),
117                            proxy_value: proxy,
118                        };
119                        Poll::Pending
120                    }
121                    SpaceLeap::Launched {
122                        launching_thread, ..
123                    } => {
124                        let current_thread_id = thread::current().id();
125                        if current_thread_id == *launching_thread {
126                            // suspicious wakening or aborted?
127                            if let Some(_failed) =
128                                EVENT_LOOP_CHOICE.take_filter(|cur_choice| match cur_choice {
129                                    EventLoopChoice::ForkFailureAndNeedRepoll => {
130                                        ControlFlow::Break(())
131                                    }
132                                    _ => ControlFlow::Continue(cur_choice),
133                                })
134                            {
135                                *this = SpaceLeap::Aborted;
136                                cx.waker().wake_by_ref();
137                                return Poll::Ready(Err(ForkFailError));
138                            }
139                            return Poll::Pending;
140                        }
141
142                        let old_value = mem::replace(this, SpaceLeap::Landed {});
143                        let SpaceLeap::Launched {
144                            proxy_value,
145                            launching_thread: _,
146                        } = old_value
147                        else {
148                            unreachable!()
149                        };
150                        cx.waker().wake_by_ref();
151                        Poll::Ready(Ok(proxy_value))
152                    }
153                    SpaceLeap::Landed => unreachable!(),
154                    SpaceLeap::Aborted => unreachable!(),
155                }
156            }
157        }
158        SpaceLeap::<R, F>::Preparation(Some(fragile::Fragile::new(f)))
159    }
160}
161
162/// Definition of a event loop with default arguments
163pub trait EventLoopDesc {
164    type ProxyType: Unpin + Send;
165
166    fn build_service_and_proxy() -> (Box<dyn FnOnce()>, Self::ProxyType);
167}
168
169/// Definition of a event loop with customized arguments
170pub trait EventLoopWithArgsDesc {
171    type Args: Unpin;
172    type ProxyType: Unpin + Send;
173
174    fn build_service_and_proxy_with_args(args: Self::Args) -> (Box<dyn FnOnce()>, Self::ProxyType);
175}
176
177impl<T> EventLoopDesc for T
178where
179    T: EventLoopWithArgsDesc,
180    <T as EventLoopWithArgsDesc>::Args: Default,
181{
182    type ProxyType = <T as EventLoopWithArgsDesc>::ProxyType;
183    fn build_service_and_proxy() -> (Box<dyn FnOnce()>, Self::ProxyType) {
184        T::build_service_and_proxy_with_args(Default::default())
185    }
186}
187
188thread_local! {
189    static EVENT_LOOP_CHOICE: Cell<EventLoopChoice> =
190        const { Cell::new(EventLoopChoice::Repoll) };
191}
192
193#[derive(Default)]
194enum EventLoopChoice {
195    #[default]
196    Repoll,
197    ForkAndRunEventLoop(Box<dyn FnOnce()>),
198    #[allow(dead_code)]
199    ForkFailureAndNeedRepoll,
200}
201
202#[doc(hidden)]
203pub fn entryscope() -> Scope {
204    Scope {}
205}
206
207struct UnparkWaker(thread::Thread);
208
209impl task::Wake for UnparkWaker {
210    fn wake(self: std::sync::Arc<Self>) {
211        self.0.unpark();
212    }
213
214    fn wake_by_ref(self: &std::sync::Arc<Self>) {
215        self.0.unpark();
216    }
217}
218
219#[doc(hidden)]
220pub fn entrypoint<R: Send + 'static>(
221    mut main_future: Pin<Box<impl Future<Output = R> + Send + 'static>>,
222) -> R {
223    let waker = task::Waker::from(Arc::new(UnparkWaker(thread::current())));
224    let mut ctx = task::Context::from_waker(&waker);
225    let event_loop_to_run = loop {
226        if let Poll::Ready(r) = main_future.as_mut().poll(&mut ctx) {
227            return r;
228        }
229
230        let Some(event_loop_to_run) =
231            EVENT_LOOP_CHOICE.take_filter(|cur_choice| match cur_choice {
232                EventLoopChoice::Repoll | EventLoopChoice::ForkFailureAndNeedRepoll => {
233                    ControlFlow::Continue(cur_choice)
234                }
235                EventLoopChoice::ForkAndRunEventLoop(f) => ControlFlow::Break(f),
236            })
237        else {
238            thread::park();
239            continue;
240        };
241
242        break event_loop_to_run;
243    };
244
245    // move to new thread
246    let waker = waker.clone();
247    let new_main_thread_join_handle = thread::Builder::new()
248        .spawn(move || {
249            waker.wake_by_ref();
250            entrypoint(main_future)
251        })
252        .expect("Failed to create thread");
253
254    event_loop_to_run();
255
256    new_main_thread_join_handle
257        .join()
258        .expect("Failed to join created thread")
259}
260
261use utils::TakeFilter;
262
263mod utils {
264    use std::cell::Cell;
265    use std::{ops::ControlFlow, thread};
266
267    pub(crate) trait TakeFilter<T> {
268        fn take_filter<R, F>(&'static self, f: F) -> Option<R>
269        where
270            F: FnOnce(T) -> ControlFlow<R, T>;
271    }
272
273    impl<T> TakeFilter<T> for thread::LocalKey<Cell<T>>
274    where
275        T: Default,
276    {
277        fn take_filter<R, F>(&'static self, f: F) -> Option<R>
278        where
279            F: FnOnce(T) -> ControlFlow<R, T>,
280        {
281            let v = f(self.take());
282            match v {
283                ControlFlow::Continue(v) => {
284                    self.set(v);
285                    None
286                }
287                ControlFlow::Break(r) => Some(r),
288            }
289        }
290    }
291}