1use crate::{App, PlatformDispatcher, PlatformScheduler};
2use futures::channel::mpsc;
3use futures::prelude::*;
4use gpui_util::{TryFutureExt, TryFutureExtBacktrace};
5use scheduler::Instant;
6use scheduler::Scheduler;
7use std::{future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc, sync::Arc, time::Duration};
8
9pub use scheduler::{
10 FallibleTask, ForegroundExecutor as SchedulerForegroundExecutor, Priority, Task,
11};
12
13#[derive(Clone)]
16pub struct BackgroundExecutor {
17 inner: scheduler::BackgroundExecutor,
18 dispatcher: Arc<dyn PlatformDispatcher>,
19}
20
21#[derive(Clone)]
24pub struct ForegroundExecutor {
25 inner: scheduler::ForegroundExecutor,
26 dispatcher: Arc<dyn PlatformDispatcher>,
27 not_send: PhantomData<Rc<()>>,
28}
29
30pub trait TaskExt<T, E> {
34 fn detach_and_log_err(self, cx: &App);
36 fn detach_and_log_err_with_backtrace(self, cx: &App);
39}
40
41impl<T, E> TaskExt<T, E> for Task<Result<T, E>>
42where
43 T: 'static,
44 E: 'static + std::fmt::Display + std::fmt::Debug,
45{
46 #[track_caller]
47 fn detach_and_log_err(self, cx: &App) {
48 let location = core::panic::Location::caller();
49 cx.foreground_executor()
50 .spawn(self.log_tracked_err(*location))
51 .detach();
52 }
53
54 #[track_caller]
55 fn detach_and_log_err_with_backtrace(self, cx: &App) {
56 let location = *core::panic::Location::caller();
57 cx.foreground_executor()
58 .spawn(self.log_tracked_err_with_backtrace(location))
59 .detach();
60 }
61}
62
63impl BackgroundExecutor {
64 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
66 #[cfg(any(test, feature = "test-support"))]
67 let scheduler: Arc<dyn Scheduler> = if let Some(test_dispatcher) = dispatcher.as_test() {
68 test_dispatcher.scheduler().clone()
69 } else {
70 Arc::new(PlatformScheduler::new(dispatcher.clone()))
71 };
72
73 #[cfg(not(any(test, feature = "test-support")))]
74 let scheduler: Arc<dyn Scheduler> = Arc::new(PlatformScheduler::new(dispatcher.clone()));
75
76 Self {
77 inner: scheduler::BackgroundExecutor::new(scheduler),
78 dispatcher,
79 }
80 }
81
82 pub fn scheduler_executor(&self) -> scheduler::BackgroundExecutor {
86 self.inner.clone()
87 }
88
89 #[track_caller]
91 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
92 where
93 R: Send + 'static,
94 {
95 self.spawn_with_priority(Priority::default(), future.boxed())
96 }
97
98 #[track_caller]
103 pub fn spawn_with_priority<R>(
104 &self,
105 priority: Priority,
106 future: impl Future<Output = R> + Send + 'static,
107 ) -> Task<R>
108 where
109 R: Send + 'static,
110 {
111 if priority == Priority::RealtimeAudio {
112 self.inner.spawn_realtime(future)
113 } else {
114 self.inner.spawn_with_priority(priority, future)
115 }
116 }
117
118 pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
123 where
124 R: Send,
125 {
126 use crate::RunnableMeta;
127 use parking_lot::{Condvar, Mutex};
128
129 struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
130
131 impl Drop for NotifyOnDrop<'_> {
132 fn drop(&mut self) {
133 *self.0.1.lock() = true;
134 self.0.0.notify_all();
135 }
136 }
137
138 struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
139
140 impl Drop for WaitOnDrop<'_> {
141 fn drop(&mut self) {
142 let mut done = self.0.1.lock();
143 if !*done {
144 self.0.0.wait(&mut done);
145 }
146 }
147 }
148
149 let dispatcher = self.dispatcher.clone();
150 let location = core::panic::Location::caller();
151
152 let pair = &(Condvar::new(), Mutex::new(false));
153 let _wait_guard = WaitOnDrop(pair);
154
155 let (runnable, task) = unsafe {
156 async_task::Builder::new()
157 .metadata(RunnableMeta { location })
158 .spawn_unchecked(
159 move |_| async {
160 let _notify_guard = NotifyOnDrop(pair);
161 future.await
162 },
163 move |runnable| {
164 dispatcher.dispatch(runnable, Priority::default());
165 },
166 )
167 };
168 runnable.schedule();
169 task.await
170 }
171
172 pub async fn scoped<'scope, F>(&self, scheduler: F)
175 where
176 F: FnOnce(&mut Scope<'scope>),
177 {
178 let mut scope = Scope::new(self.clone(), Priority::default());
179 (scheduler)(&mut scope);
180 let spawned = mem::take(&mut scope.futures)
181 .into_iter()
182 .map(|f| self.spawn_with_priority(scope.priority, f))
183 .collect::<Vec<_>>();
184 for task in spawned {
185 task.await;
186 }
187 }
188
189 pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
192 where
193 F: FnOnce(&mut Scope<'scope>),
194 {
195 let mut scope = Scope::new(self.clone(), priority);
196 (scheduler)(&mut scope);
197 let spawned = mem::take(&mut scope.futures)
198 .into_iter()
199 .map(|f| self.spawn_with_priority(scope.priority, f))
200 .collect::<Vec<_>>();
201 for task in spawned {
202 task.await;
203 }
204 }
205
206 pub fn now(&self) -> Instant {
211 self.inner.scheduler().clock().now()
212 }
213
214 #[track_caller]
218 pub fn timer(&self, duration: Duration) -> Task<()> {
219 if duration.is_zero() {
220 return Task::ready(());
221 }
222 self.spawn(self.inner.scheduler().timer(duration))
223 }
224
225 #[cfg(any(test, feature = "test-support"))]
227 pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
228 self.dispatcher.as_test().unwrap().simulate_random_delay()
229 }
230
231 #[cfg(any(test, feature = "test-support"))]
233 pub fn advance_clock(&self, duration: Duration) {
234 self.dispatcher.as_test().unwrap().advance_clock(duration)
235 }
236
237 #[cfg(any(test, feature = "test-support"))]
239 pub fn tick(&self) -> bool {
240 self.dispatcher.as_test().unwrap().scheduler().tick()
241 }
242
243 #[cfg(any(test, feature = "test-support"))]
250 pub fn run_until_parked(&self) {
251 let scheduler = self.dispatcher.as_test().unwrap().scheduler();
252 scheduler.run();
253 }
254
255 #[cfg(any(test, feature = "test-support"))]
257 pub fn allow_parking(&self) {
258 self.dispatcher
259 .as_test()
260 .unwrap()
261 .scheduler()
262 .allow_parking();
263
264 if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") {
265 log::warn!("[gpui::executor] allow_parking: enabled");
266 }
267 }
268
269 #[cfg(any(test, feature = "test-support"))]
271 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
272 self.dispatcher
273 .as_test()
274 .unwrap()
275 .scheduler()
276 .set_timeout_ticks(range);
277 }
278
279 #[cfg(any(test, feature = "test-support"))]
281 pub fn forbid_parking(&self) {
282 self.dispatcher
283 .as_test()
284 .unwrap()
285 .scheduler()
286 .forbid_parking();
287 }
288
289 #[cfg(any(test, feature = "test-support"))]
291 pub fn rng(&self) -> scheduler::SharedRng {
292 self.dispatcher.as_test().unwrap().scheduler().rng()
293 }
294
295 pub fn num_cpus(&self) -> usize {
297 #[cfg(any(test, feature = "test-support"))]
298 if let Some(test) = self.dispatcher.as_test() {
299 return test.num_cpus_override().unwrap_or(4);
300 }
301 num_cpus::get()
302 }
303
304 #[cfg(any(test, feature = "test-support"))]
307 pub fn set_num_cpus(&self, count: usize) {
308 self.dispatcher
309 .as_test()
310 .expect("set_num_cpus can only be called on a test executor")
311 .set_num_cpus(count);
312 }
313
314 pub fn is_main_thread(&self) -> bool {
316 self.dispatcher.is_main_thread()
317 }
318
319 #[doc(hidden)]
320 pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
321 &self.dispatcher
322 }
323}
324
325impl ForegroundExecutor {
326 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
328 #[cfg(any(test, feature = "test-support"))]
329 let (scheduler, session_id): (Arc<dyn Scheduler>, _) =
330 if let Some(test_dispatcher) = dispatcher.as_test() {
331 (
332 test_dispatcher.scheduler().clone(),
333 test_dispatcher.session_id(),
334 )
335 } else {
336 let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
337 let session_id = platform_scheduler.allocate_session_id();
338 (platform_scheduler, session_id)
339 };
340
341 #[cfg(not(any(test, feature = "test-support")))]
342 let (scheduler, session_id): (Arc<dyn Scheduler>, _) = {
343 let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
344 let session_id = platform_scheduler.allocate_session_id();
345 (platform_scheduler, session_id)
346 };
347
348 let inner = scheduler::ForegroundExecutor::new(session_id, scheduler);
349
350 Self {
351 inner,
352 dispatcher,
353 not_send: PhantomData,
354 }
355 }
356
357 #[track_caller]
359 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
360 where
361 R: 'static,
362 {
363 self.inner.spawn(future.boxed_local())
364 }
365
366 #[track_caller]
368 pub fn spawn_with_priority<R>(
369 &self,
370 _priority: Priority,
371 future: impl Future<Output = R> + 'static,
372 ) -> Task<R>
373 where
374 R: 'static,
375 {
376 self.inner.spawn(future)
378 }
379
380 #[cfg(any(test, feature = "test-support"))]
382 #[track_caller]
383 pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
384 use std::cell::Cell;
385
386 let scheduler = self.inner.scheduler();
387
388 let output = Cell::new(None);
389 let future = async {
390 output.set(Some(future.await));
391 };
392 let mut future = std::pin::pin!(future);
393
394 scheduler.block(None, future.as_mut(), None);
398
399 output.take().expect("block_test future did not complete")
400 }
401
402 pub fn block_on<R>(&self, future: impl Future<Output = R>) -> R {
405 self.inner.block_on(future)
406 }
407
408 pub fn block_with_timeout<R, Fut: Future<Output = R>>(
410 &self,
411 duration: Duration,
412 future: Fut,
413 ) -> Result<R, impl Future<Output = R> + use<R, Fut>> {
414 self.inner.block_with_timeout(duration, future)
415 }
416
417 #[doc(hidden)]
418 pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
419 &self.dispatcher
420 }
421
422 #[doc(hidden)]
423 pub fn scheduler_executor(&self) -> SchedulerForegroundExecutor {
424 self.inner.clone()
425 }
426}
427
428pub struct Scope<'a> {
430 executor: BackgroundExecutor,
431 priority: Priority,
432 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
433 tx: Option<mpsc::Sender<()>>,
434 rx: mpsc::Receiver<()>,
435 lifetime: PhantomData<&'a ()>,
436}
437
438impl<'a> Scope<'a> {
439 fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
440 let (tx, rx) = mpsc::channel(1);
441 Self {
442 executor,
443 priority,
444 tx: Some(tx),
445 rx,
446 futures: Default::default(),
447 lifetime: PhantomData,
448 }
449 }
450
451 pub fn num_cpus(&self) -> usize {
453 self.executor.num_cpus()
454 }
455
456 #[track_caller]
458 pub fn spawn<F>(&mut self, f: F)
459 where
460 F: Future<Output = ()> + Send + 'a,
461 {
462 let tx = self.tx.clone().unwrap();
463
464 let f = unsafe {
467 mem::transmute::<
468 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
469 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
470 >(Box::pin(async move {
471 f.await;
472 drop(tx);
473 }))
474 };
475 self.futures.push(f);
476 }
477}
478
479impl Drop for Scope<'_> {
480 fn drop(&mut self) {
481 self.tx.take().unwrap();
482
483 let future = async {
486 self.rx.next().await;
487 };
488 let mut future = std::pin::pin!(future);
489 self.executor
490 .inner
491 .scheduler()
492 .block(None, future.as_mut(), None);
493 }
494}
495
496#[cfg(test)]
497mod test {
498 use super::*;
499 use crate::{App, TestDispatcher, TestPlatform};
500 use std::cell::RefCell;
501
502 fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
505 let dispatcher = TestDispatcher::new(0);
506 let arc_dispatcher = Arc::new(dispatcher.clone());
507 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
508 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
509
510 let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
511 let asset_source = Arc::new(());
512 let http_client = http_client::FakeHttpClient::with_404_response();
513
514 let app = App::new_app(platform, asset_source, http_client);
515 (dispatcher, background_executor, app)
516 }
517
518 #[test]
519 fn sanity_test_tasks_run() {
520 let (dispatcher, _background_executor, app) = create_test_app();
521 let foreground_executor = app.borrow().foreground_executor.clone();
522
523 let task_ran = Rc::new(RefCell::new(false));
524
525 foreground_executor
526 .spawn({
527 let task_ran = Rc::clone(&task_ran);
528 async move {
529 *task_ran.borrow_mut() = true;
530 }
531 })
532 .detach();
533
534 dispatcher.run_until_parked();
536
537 assert!(
539 *task_ran.borrow(),
540 "Task should run normally when app is alive"
541 );
542 }
543}