aeth_task/
spawner.rs

1use futures::FutureExt;
2use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
3use futures::executor::{LocalPool, LocalSpawner};
4use futures::future::{BoxFuture, RemoteHandle};
5use futures::lock::Mutex;
6use futures::task::{LocalSpawnExt, SpawnExt};
7use std::cell::RefCell;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use tokio::runtime::Runtime;
11
12/// Task handle trait.
13///
14/// This handle is used to receive result from the
15/// task, as well as controlling the task like
16/// cancelling and detaching it:
17///
18/// - Awaiting this handle receives the result.
19/// - Dropping this handle cancels the task.
20/// - Calling `detach` consumes and detaches the task.
21pub trait Handle<T>: Future<Output = T>
22where
23    T: 'static,
24{
25    fn detach(self);
26}
27
28struct TaskHandle<T>
29where
30    T: 'static,
31{
32    handle: RemoteHandle<T>,
33}
34
35impl<T> TaskHandle<T>
36where
37    T: 'static,
38{
39    pub fn new(handle: RemoteHandle<T>) -> Self {
40        Self { handle }
41    }
42}
43
44impl<T> Future for TaskHandle<T>
45where
46    T: 'static,
47{
48    type Output = T;
49    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
50        self.handle.poll_unpin(cx)
51    }
52}
53
54impl<T> Handle<T> for TaskHandle<T>
55where
56    T: 'static,
57{
58    fn detach(self) {
59        self.handle.forget();
60    }
61}
62
63pub(crate) struct ForegroundSpawner {
64    pub(crate) local_pool: RefCell<LocalPool>,
65    pub(crate) local_spawner: LocalSpawner,
66    pub(crate) tokio_runtime: Runtime,
67    pub(crate) _loopback_send: UnboundedSender<BoxFuture<'static, ()>>,
68    pub(crate) loopback_recv: Mutex<UnboundedReceiver<BoxFuture<'static, ()>>>,
69}
70
71impl ForegroundSpawner {
72    fn spawn_foreground<F, T>(&self, future: F) -> TaskHandle<T>
73    where
74        F: Future<Output = T> + 'static,
75        T: 'static,
76    {
77        let handle = self.local_spawner.spawn_local_with_handle(future).unwrap();
78        TaskHandle::new(handle)
79    }
80
81    fn spawn_background<F, T>(&self, future: F) -> TaskHandle<T>
82    where
83        F: Future<Output = T> + Send + 'static,
84        T: Send + 'static,
85    {
86        // XXX: The foreground thread does not have tokio
87        // context initialized, so we will need to use
88        // the tokio_runtime handle to create the task.
89        let (remote, handle) = future.remote_handle();
90        let _ = self.tokio_runtime.spawn(remote);
91        TaskHandle::new(handle)
92    }
93
94    fn drain_foreground_loopback(&self) -> Option<()> {
95        let mut loopback_recv = self.loopback_recv.try_lock()?;
96        loop {
97            self.local_spawner
98                .spawn(loopback_recv.try_next().ok()??)
99                .unwrap();
100        }
101    }
102
103    fn run_foreground(&self) {
104        // XXX: This is okay since there's only one
105        // foreground thread.
106        self.local_pool.borrow_mut().run_until_stalled();
107    }
108}
109
110pub(crate) struct BackgroundSpawner {
111    pub(crate) loopback_send: UnboundedSender<BoxFuture<'static, ()>>,
112}
113
114impl BackgroundSpawner {
115    fn spawn_background<F, T>(&self, future: F) -> TaskHandle<T>
116    where
117        F: Future<Output = T> + Send + 'static,
118        T: Send + 'static,
119    {
120        // XXX: Every background thread will enter the
121        // tokio context, so we can use tokio::spawn.
122        let (remote, handle) = future.remote_handle();
123        let _ = tokio::spawn(remote);
124        TaskHandle::new(handle)
125    }
126
127    fn spawn_foreground<F, T>(&self, future: F) -> TaskHandle<T>
128    where
129        F: Future<Output = T> + Send + 'static,
130        T: Send + 'static,
131    {
132        let (remote, handle) = future.remote_handle();
133        // XXX: note that the main thread holds one copy
134        // of the sender and receiver, so there'll never
135        // be an error sending it.
136        self.loopback_send.unbounded_send(Box::pin(remote)).unwrap();
137        TaskHandle::new(handle)
138    }
139}
140
141/// Enumeration of the task / thread types.
142///
143/// Since we have a well-defined primitives of creating
144/// tasks of different kind and using async / await
145/// language to control them, for convenience, each task
146/// must only be associated with one type, and each
147/// thread is only allowed to execute one type of task,
148/// so there's no ambiguity while querying the type.
149#[derive(PartialEq, Eq, Clone, Copy, Debug)]
150pub enum Type {
151    Uninit,
152    Foreground,
153    Background,
154}
155
156pub(crate) enum Spawner {
157    Uninit,
158    Foreground(ForegroundSpawner),
159    Background(BackgroundSpawner),
160}
161
162thread_local! {
163    pub(crate) static SPAWNER: RefCell<Spawner> = RefCell::new(Spawner::Uninit);
164}
165
166impl Spawner {
167    fn panic_uninit<T>(&self) -> T {
168        panic!("Task framework not initialized");
169    }
170
171    fn foreground(&self) -> &ForegroundSpawner {
172        match self {
173            Spawner::Foreground(fg) => fg,
174            Spawner::Uninit => self.panic_uninit(),
175            _ => {
176                panic!("Not on foreground thread");
177            }
178        }
179    }
180
181    fn which_type(&self) -> Type {
182        match self {
183            Spawner::Foreground(_) => Type::Foreground,
184            Spawner::Background(_) => Type::Background,
185            Spawner::Uninit => Type::Uninit,
186        }
187    }
188}
189
190/// Spawn a foreground task from foreground thread.
191///
192/// This function can only be called from the foreground
193/// thread. Actually, if a background thread want to run
194/// a task in foreground thread, the Future must be Send,
195/// while this method does not enforces it.
196#[must_use = "Dropping the Handle is equivalent to canceling the future."]
197pub fn spawn_foreground<F, T>(future: F) -> impl Handle<T>
198where
199    F: Future<Output = T> + 'static,
200    T: 'static,
201{
202    SPAWNER.with_borrow(|w| w.foreground().spawn_foreground(future))
203}
204
205/// Spawns a background task.
206///
207/// This function can either be called from a foreground
208/// thread or a background thread, and the future must be
209/// Send since we are using tokio as the task executor,
210/// which will possibly move tasks between threads.
211#[must_use = "Dropping the Handle is equivalent to canceling the future."]
212pub fn dispatch_background<F, T>(future: F) -> impl Handle<T>
213where
214    F: Future<Output = T> + Send + 'static,
215    T: Send + 'static,
216{
217    SPAWNER.with_borrow(|w| {
218        // WARN: calling tokio::spawn requires tokio context, which
219        // is available on background thread, while we must explicitly
220        // call tokio_runtime.spawn on foreground thread.
221        match w {
222            Spawner::Foreground(fg) => fg.spawn_background(future),
223            Spawner::Background(bg) => bg.spawn_background(future),
224            Spawner::Uninit => w.panic_uninit(),
225            #[allow(unreachable_patterns)]
226            _ => {
227                panic!("Not on foreground or background thread.")
228            }
229        }
230    })
231}
232
233/// Spawns a foreground task.
234///
235/// This function can either be called from a foreground
236/// thread or a background thread, and the future must be
237/// Send since the future is sent to the foreground thread
238/// when dispatching from a background thread. The result
239/// will also need to be moved to the background thread.
240#[must_use = "Dropping the Handle is equivalent to canceling the future."]
241pub fn dispatch_foreground<F, T>(future: F) -> impl Handle<T>
242where
243    F: Future<Output = T> + Send + 'static,
244    T: Send + 'static,
245{
246    SPAWNER.with_borrow(|w| match w {
247        Spawner::Foreground(fg) => fg.spawn_foreground(future),
248        Spawner::Background(bg) => bg.spawn_foreground(future),
249        Spawner::Uninit => w.panic_uninit(),
250        #[allow(unreachable_patterns)]
251        _ => {
252            panic!("Not on foreground or background thread.")
253        }
254    })
255}
256
257/// Drains the foreground tasks dispatched from
258/// background threads and push them to the task queue
259/// of foreground thread.
260///
261/// This function must be called from the foreground
262/// thread. It's non-blocking, which means it returns
263/// immediately after pushing the tasks.
264pub fn drain_foreground_loopback() {
265    SPAWNER.with_borrow(|w| {
266        w.foreground().drain_foreground_loopback();
267    })
268}
269
270/// Runs the foreground tasks until stalled.
271///
272/// This function must only be called from the
273/// foreground thread. It will return as soon as
274/// all foreground tasks are stalled.
275pub fn run_foreground() {
276    SPAWNER.with_borrow(|w| {
277        w.foreground().run_foreground();
278    })
279}
280
281/// Returns the current task / thread type.
282pub fn current_type() -> Type {
283    SPAWNER.with_borrow(Spawner::which_type)
284}