1use futures::FutureExt;
2use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
3use futures::executor::{LocalPool, LocalSpawner};
4use futures::future::{BoxFuture, RemoteHandle};
5use futures::lock::Mutex;
6use futures::task::{LocalSpawnExt, SpawnExt};
7use std::cell::RefCell;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use tokio::runtime::Runtime;
11
12pub trait Handle<T>: Future<Output = T>
22where
23 T: 'static,
24{
25 fn detach(self);
26}
27
28struct TaskHandle<T>
29where
30 T: 'static,
31{
32 handle: RemoteHandle<T>,
33}
34
35impl<T> TaskHandle<T>
36where
37 T: 'static,
38{
39 pub fn new(handle: RemoteHandle<T>) -> Self {
40 Self { handle }
41 }
42}
43
44impl<T> Future for TaskHandle<T>
45where
46 T: 'static,
47{
48 type Output = T;
49 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
50 self.handle.poll_unpin(cx)
51 }
52}
53
54impl<T> Handle<T> for TaskHandle<T>
55where
56 T: 'static,
57{
58 fn detach(self) {
59 self.handle.forget();
60 }
61}
62
63pub(crate) struct ForegroundSpawner {
64 pub(crate) local_pool: RefCell<LocalPool>,
65 pub(crate) local_spawner: LocalSpawner,
66 pub(crate) tokio_runtime: Runtime,
67 pub(crate) _loopback_send: UnboundedSender<BoxFuture<'static, ()>>,
68 pub(crate) loopback_recv: Mutex<UnboundedReceiver<BoxFuture<'static, ()>>>,
69}
70
71impl ForegroundSpawner {
72 fn spawn_foreground<F, T>(&self, future: F) -> TaskHandle<T>
73 where
74 F: Future<Output = T> + 'static,
75 T: 'static,
76 {
77 let handle = self.local_spawner.spawn_local_with_handle(future).unwrap();
78 TaskHandle::new(handle)
79 }
80
81 fn spawn_background<F, T>(&self, future: F) -> TaskHandle<T>
82 where
83 F: Future<Output = T> + Send + 'static,
84 T: Send + 'static,
85 {
86 let (remote, handle) = future.remote_handle();
90 let _ = self.tokio_runtime.spawn(remote);
91 TaskHandle::new(handle)
92 }
93
94 fn drain_foreground_loopback(&self) -> Option<()> {
95 let mut loopback_recv = self.loopback_recv.try_lock()?;
96 loop {
97 self.local_spawner
98 .spawn(loopback_recv.try_next().ok()??)
99 .unwrap();
100 }
101 }
102
103 fn run_foreground(&self) {
104 self.local_pool.borrow_mut().run_until_stalled();
107 }
108}
109
110pub(crate) struct BackgroundSpawner {
111 pub(crate) loopback_send: UnboundedSender<BoxFuture<'static, ()>>,
112}
113
114impl BackgroundSpawner {
115 fn spawn_background<F, T>(&self, future: F) -> TaskHandle<T>
116 where
117 F: Future<Output = T> + Send + 'static,
118 T: Send + 'static,
119 {
120 let (remote, handle) = future.remote_handle();
123 let _ = tokio::spawn(remote);
124 TaskHandle::new(handle)
125 }
126
127 fn spawn_foreground<F, T>(&self, future: F) -> TaskHandle<T>
128 where
129 F: Future<Output = T> + Send + 'static,
130 T: Send + 'static,
131 {
132 let (remote, handle) = future.remote_handle();
133 self.loopback_send.unbounded_send(Box::pin(remote)).unwrap();
137 TaskHandle::new(handle)
138 }
139}
140
141#[derive(PartialEq, Eq, Clone, Copy, Debug)]
150pub enum Type {
151 Uninit,
152 Foreground,
153 Background,
154}
155
156pub(crate) enum Spawner {
157 Uninit,
158 Foreground(ForegroundSpawner),
159 Background(BackgroundSpawner),
160}
161
162thread_local! {
163 pub(crate) static SPAWNER: RefCell<Spawner> = RefCell::new(Spawner::Uninit);
164}
165
166impl Spawner {
167 fn panic_uninit<T>(&self) -> T {
168 panic!("Task framework not initialized");
169 }
170
171 fn foreground(&self) -> &ForegroundSpawner {
172 match self {
173 Spawner::Foreground(fg) => fg,
174 Spawner::Uninit => self.panic_uninit(),
175 _ => {
176 panic!("Not on foreground thread");
177 }
178 }
179 }
180
181 fn which_type(&self) -> Type {
182 match self {
183 Spawner::Foreground(_) => Type::Foreground,
184 Spawner::Background(_) => Type::Background,
185 Spawner::Uninit => Type::Uninit,
186 }
187 }
188}
189
190#[must_use = "Dropping the Handle is equivalent to canceling the future."]
197pub fn spawn_foreground<F, T>(future: F) -> impl Handle<T>
198where
199 F: Future<Output = T> + 'static,
200 T: 'static,
201{
202 SPAWNER.with_borrow(|w| w.foreground().spawn_foreground(future))
203}
204
205#[must_use = "Dropping the Handle is equivalent to canceling the future."]
212pub fn dispatch_background<F, T>(future: F) -> impl Handle<T>
213where
214 F: Future<Output = T> + Send + 'static,
215 T: Send + 'static,
216{
217 SPAWNER.with_borrow(|w| {
218 match w {
222 Spawner::Foreground(fg) => fg.spawn_background(future),
223 Spawner::Background(bg) => bg.spawn_background(future),
224 Spawner::Uninit => w.panic_uninit(),
225 #[allow(unreachable_patterns)]
226 _ => {
227 panic!("Not on foreground or background thread.")
228 }
229 }
230 })
231}
232
233#[must_use = "Dropping the Handle is equivalent to canceling the future."]
241pub fn dispatch_foreground<F, T>(future: F) -> impl Handle<T>
242where
243 F: Future<Output = T> + Send + 'static,
244 T: Send + 'static,
245{
246 SPAWNER.with_borrow(|w| match w {
247 Spawner::Foreground(fg) => fg.spawn_foreground(future),
248 Spawner::Background(bg) => bg.spawn_foreground(future),
249 Spawner::Uninit => w.panic_uninit(),
250 #[allow(unreachable_patterns)]
251 _ => {
252 panic!("Not on foreground or background thread.")
253 }
254 })
255}
256
257pub fn drain_foreground_loopback() {
265 SPAWNER.with_borrow(|w| {
266 w.foreground().drain_foreground_loopback();
267 })
268}
269
270pub fn run_foreground() {
276 SPAWNER.with_borrow(|w| {
277 w.foreground().run_foreground();
278 })
279}
280
281pub fn current_type() -> Type {
283 SPAWNER.with_borrow(Spawner::which_type)
284}