1use crate::{App, PlatformDispatcher, PlatformScheduler};
2use futures::channel::mpsc;
3use futures::prelude::*;
4use gpui_util::{TryFutureExt, TryFutureExtBacktrace};
5use scheduler::Instant;
6use scheduler::Scheduler;
7use std::{future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc, sync::Arc, time::Duration};
8
9pub use scheduler::{
10 FallibleTask, ForegroundExecutor as SchedulerForegroundExecutor, Priority, Task,
11};
12
13#[derive(Clone)]
16pub struct BackgroundExecutor {
17 inner: scheduler::BackgroundExecutor,
18 dispatcher: Arc<dyn PlatformDispatcher>,
19}
20
21#[derive(Clone)]
24pub struct ForegroundExecutor {
25 inner: scheduler::ForegroundExecutor,
26 dispatcher: Arc<dyn PlatformDispatcher>,
27 not_send: PhantomData<Rc<()>>,
28}
29
30pub trait TaskExt<T, E> {
34 fn detach_and_log_err(self, cx: &App);
36 fn detach_and_log_err_with_backtrace(self, cx: &App);
39}
40
41impl<T, E> TaskExt<T, E> for Task<Result<T, E>>
42where
43 T: 'static,
44 E: 'static + std::fmt::Display + std::fmt::Debug,
45{
46 #[track_caller]
47 fn detach_and_log_err(self, cx: &App) {
48 let location = core::panic::Location::caller();
49 cx.foreground_executor()
50 .spawn(self.log_tracked_err(*location))
51 .detach();
52 }
53
54 #[track_caller]
55 fn detach_and_log_err_with_backtrace(self, cx: &App) {
56 let location = *core::panic::Location::caller();
57 cx.foreground_executor()
58 .spawn(self.log_tracked_err_with_backtrace(location))
59 .detach();
60 }
61}
62
63impl BackgroundExecutor {
64 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
66 #[cfg(any(test, feature = "test-support"))]
67 let scheduler: Arc<dyn Scheduler> = if let Some(test_dispatcher) = dispatcher.as_test() {
68 test_dispatcher.scheduler().clone()
69 } else {
70 Arc::new(PlatformScheduler::new(dispatcher.clone()))
71 };
72
73 #[cfg(not(any(test, feature = "test-support")))]
74 let scheduler: Arc<dyn Scheduler> = Arc::new(PlatformScheduler::new(dispatcher.clone()));
75
76 Self {
77 inner: scheduler::BackgroundExecutor::new(scheduler),
78 dispatcher,
79 }
80 }
81
82 pub fn scheduler_executor(&self) -> scheduler::BackgroundExecutor {
86 self.inner.clone()
87 }
88
89 #[track_caller]
91 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
92 where
93 R: Send + 'static,
94 {
95 self.spawn_with_priority(Priority::default(), future.boxed())
96 }
97
98 #[track_caller]
103 pub fn spawn_with_priority<R>(
104 &self,
105 priority: Priority,
106 future: impl Future<Output = R> + Send + 'static,
107 ) -> Task<R>
108 where
109 R: Send + 'static,
110 {
111 if priority == Priority::RealtimeAudio {
112 self.inner.spawn_realtime(future)
113 } else {
114 self.inner.spawn_with_priority(priority, future)
115 }
116 }
117
118 pub async fn scoped<'scope, F>(&self, scheduler: F)
121 where
122 F: FnOnce(&mut Scope<'scope>),
123 {
124 let mut scope = Scope::new(self.clone(), Priority::default());
125 (scheduler)(&mut scope);
126 let spawned = mem::take(&mut scope.futures)
127 .into_iter()
128 .map(|f| self.spawn_with_priority(scope.priority, f))
129 .collect::<Vec<_>>();
130 for task in spawned {
131 task.await;
132 }
133 }
134
135 pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
138 where
139 F: FnOnce(&mut Scope<'scope>),
140 {
141 let mut scope = Scope::new(self.clone(), priority);
142 (scheduler)(&mut scope);
143 let spawned = mem::take(&mut scope.futures)
144 .into_iter()
145 .map(|f| self.spawn_with_priority(scope.priority, f))
146 .collect::<Vec<_>>();
147 for task in spawned {
148 task.await;
149 }
150 }
151
152 pub fn now(&self) -> Instant {
157 self.inner.scheduler().clock().now()
158 }
159
160 #[track_caller]
164 pub fn timer(&self, duration: Duration) -> Task<()> {
165 if duration.is_zero() {
166 return Task::ready(());
167 }
168 self.spawn(self.inner.scheduler().timer(duration))
169 }
170
171 #[cfg(any(test, feature = "test-support"))]
173 pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
174 self.dispatcher.as_test().unwrap().simulate_random_delay()
175 }
176
177 #[cfg(any(test, feature = "test-support"))]
179 pub fn advance_clock(&self, duration: Duration) {
180 self.dispatcher.as_test().unwrap().advance_clock(duration)
181 }
182
183 #[cfg(any(test, feature = "test-support"))]
185 pub fn tick(&self) -> bool {
186 self.dispatcher.as_test().unwrap().scheduler().tick()
187 }
188
189 #[cfg(any(test, feature = "test-support"))]
196 pub fn run_until_parked(&self) {
197 let scheduler = self.dispatcher.as_test().unwrap().scheduler();
198 scheduler.run();
199 }
200
201 #[cfg(any(test, feature = "test-support"))]
203 pub fn allow_parking(&self) {
204 self.dispatcher
205 .as_test()
206 .unwrap()
207 .scheduler()
208 .allow_parking();
209
210 if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") {
211 log::warn!("[gpui::executor] allow_parking: enabled");
212 }
213 }
214
215 #[cfg(any(test, feature = "test-support"))]
217 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
218 self.dispatcher
219 .as_test()
220 .unwrap()
221 .scheduler()
222 .set_timeout_ticks(range);
223 }
224
225 #[cfg(any(test, feature = "test-support"))]
227 pub fn forbid_parking(&self) {
228 self.dispatcher
229 .as_test()
230 .unwrap()
231 .scheduler()
232 .forbid_parking();
233 }
234
235 #[cfg(any(test, feature = "test-support"))]
237 pub fn rng(&self) -> scheduler::SharedRng {
238 self.dispatcher.as_test().unwrap().scheduler().rng()
239 }
240
241 pub fn num_cpus(&self) -> usize {
243 #[cfg(any(test, feature = "test-support"))]
244 if let Some(test) = self.dispatcher.as_test() {
245 return test.num_cpus_override().unwrap_or(4);
246 }
247 num_cpus::get()
248 }
249
250 #[cfg(any(test, feature = "test-support"))]
253 pub fn set_num_cpus(&self, count: usize) {
254 self.dispatcher
255 .as_test()
256 .expect("set_num_cpus can only be called on a test executor")
257 .set_num_cpus(count);
258 }
259
260 pub fn is_main_thread(&self) -> bool {
262 self.dispatcher.is_main_thread()
263 }
264
265 #[doc(hidden)]
266 pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
267 &self.dispatcher
268 }
269}
270
271impl ForegroundExecutor {
272 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
274 #[cfg(any(test, feature = "test-support"))]
275 let (scheduler, session_id): (Arc<dyn Scheduler>, _) =
276 if let Some(test_dispatcher) = dispatcher.as_test() {
277 (
278 test_dispatcher.scheduler().clone(),
279 test_dispatcher.session_id(),
280 )
281 } else {
282 let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
283 let session_id = platform_scheduler.allocate_session_id();
284 (platform_scheduler, session_id)
285 };
286
287 #[cfg(not(any(test, feature = "test-support")))]
288 let (scheduler, session_id): (Arc<dyn Scheduler>, _) = {
289 let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
290 let session_id = platform_scheduler.allocate_session_id();
291 (platform_scheduler, session_id)
292 };
293
294 let inner = scheduler::ForegroundExecutor::new(session_id, scheduler);
295
296 Self {
297 inner,
298 dispatcher,
299 not_send: PhantomData,
300 }
301 }
302
303 #[track_caller]
305 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
306 where
307 R: 'static,
308 {
309 self.inner.spawn(future.boxed_local())
310 }
311
312 #[track_caller]
314 pub fn spawn_with_priority<R>(
315 &self,
316 _priority: Priority,
317 future: impl Future<Output = R> + 'static,
318 ) -> Task<R>
319 where
320 R: 'static,
321 {
322 self.inner.spawn(future)
324 }
325
326 #[cfg(any(test, feature = "test-support"))]
328 #[track_caller]
329 pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
330 use std::cell::Cell;
331
332 let scheduler = self.inner.scheduler();
333
334 let output = Cell::new(None);
335 let future = async {
336 output.set(Some(future.await));
337 };
338 let mut future = std::pin::pin!(future);
339
340 scheduler.block(None, future.as_mut(), None);
344
345 output.take().expect("block_test future did not complete")
346 }
347
348 pub fn block_on<R>(&self, future: impl Future<Output = R>) -> R {
351 self.inner.block_on(future)
352 }
353
354 pub fn block_with_timeout<R, Fut: Future<Output = R>>(
356 &self,
357 duration: Duration,
358 future: Fut,
359 ) -> Result<R, impl Future<Output = R> + use<R, Fut>> {
360 self.inner.block_with_timeout(duration, future)
361 }
362
363 #[doc(hidden)]
364 pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
365 &self.dispatcher
366 }
367
368 #[doc(hidden)]
369 pub fn scheduler_executor(&self) -> SchedulerForegroundExecutor {
370 self.inner.clone()
371 }
372}
373
374pub struct Scope<'a> {
376 executor: BackgroundExecutor,
377 priority: Priority,
378 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
379 tx: Option<mpsc::Sender<()>>,
380 rx: mpsc::Receiver<()>,
381 lifetime: PhantomData<&'a ()>,
382}
383
384impl<'a> Scope<'a> {
385 fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
386 let (tx, rx) = mpsc::channel(1);
387 Self {
388 executor,
389 priority,
390 tx: Some(tx),
391 rx,
392 futures: Default::default(),
393 lifetime: PhantomData,
394 }
395 }
396
397 pub fn num_cpus(&self) -> usize {
399 self.executor.num_cpus()
400 }
401
402 #[track_caller]
404 pub fn spawn<F>(&mut self, f: F)
405 where
406 F: Future<Output = ()> + Send + 'a,
407 {
408 let tx = self.tx.clone().unwrap();
409
410 let f = unsafe {
413 mem::transmute::<
414 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
415 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
416 >(Box::pin(async move {
417 f.await;
418 drop(tx);
419 }))
420 };
421 self.futures.push(f);
422 }
423}
424
425impl Drop for Scope<'_> {
426 fn drop(&mut self) {
427 self.tx.take().unwrap();
428
429 let future = async {
432 self.rx.next().await;
433 };
434 let mut future = std::pin::pin!(future);
435 self.executor
436 .inner
437 .scheduler()
438 .block(None, future.as_mut(), None);
439 }
440}
441
442#[cfg(test)]
443mod test {
444 use super::*;
445 use crate::{App, TestDispatcher, TestPlatform};
446 use std::cell::RefCell;
447
448 fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
451 let dispatcher = TestDispatcher::new(0);
452 let arc_dispatcher = Arc::new(dispatcher.clone());
453 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
454 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
455
456 let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
457 let asset_source = Arc::new(());
458 let http_client = http_client::FakeHttpClient::with_404_response();
459
460 let app = App::new_app(platform, asset_source, http_client);
461 (dispatcher, background_executor, app)
462 }
463
464 #[test]
465 fn sanity_test_tasks_run() {
466 let (dispatcher, _background_executor, app) = create_test_app();
467 let foreground_executor = app.borrow().foreground_executor.clone();
468
469 let task_ran = Rc::new(RefCell::new(false));
470
471 foreground_executor
472 .spawn({
473 let task_ran = Rc::clone(&task_ran);
474 async move {
475 *task_ran.borrow_mut() = true;
476 }
477 })
478 .detach();
479
480 dispatcher.run_until_parked();
482
483 assert!(
485 *task_ran.borrow(),
486 "Task should run normally when app is alive"
487 );
488 }
489}