Skip to main content

gpui/
executor.rs

1use crate::{App, PlatformDispatcher, PlatformScheduler};
2use futures::channel::mpsc;
3use futures::prelude::*;
4use gpui_util::{TryFutureExt, TryFutureExtBacktrace};
5use scheduler::Instant;
6use scheduler::Scheduler;
7use std::{future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc, sync::Arc, time::Duration};
8
9pub use scheduler::{
10    FallibleTask, ForegroundExecutor as SchedulerForegroundExecutor, Priority, Task,
11};
12
13/// A pointer to the executor that is currently running,
14/// for spawning background tasks.
15#[derive(Clone)]
16pub struct BackgroundExecutor {
17    inner: scheduler::BackgroundExecutor,
18    dispatcher: Arc<dyn PlatformDispatcher>,
19}
20
21/// A pointer to the executor that is currently running,
22/// for spawning tasks on the main thread.
23#[derive(Clone)]
24pub struct ForegroundExecutor {
25    inner: scheduler::ForegroundExecutor,
26    dispatcher: Arc<dyn PlatformDispatcher>,
27    not_send: PhantomData<Rc<()>>,
28}
29
30/// Extension trait for `Task<Result<T, E>>` that adds `detach_and_log_err` with an `&App` context.
31///
32/// This trait is automatically implemented for all `Task<Result<T, E>>` types.
33pub trait TaskExt<T, E> {
34    /// Run the task to completion in the background and log any errors that occur.
35    fn detach_and_log_err(self, cx: &App);
36    /// Like [`Self::detach_and_log_err`], but uses `{:?}` formatting on failure so `anyhow::Error`
37    /// values emit their full backtrace. Prefer `detach_and_log_err` unless a backtrace is wanted.
38    fn detach_and_log_err_with_backtrace(self, cx: &App);
39}
40
41impl<T, E> TaskExt<T, E> for Task<Result<T, E>>
42where
43    T: 'static,
44    E: 'static + std::fmt::Display + std::fmt::Debug,
45{
46    #[track_caller]
47    fn detach_and_log_err(self, cx: &App) {
48        let location = core::panic::Location::caller();
49        cx.foreground_executor()
50            .spawn(self.log_tracked_err(*location))
51            .detach();
52    }
53
54    #[track_caller]
55    fn detach_and_log_err_with_backtrace(self, cx: &App) {
56        let location = *core::panic::Location::caller();
57        cx.foreground_executor()
58            .spawn(self.log_tracked_err_with_backtrace(location))
59            .detach();
60    }
61}
62
63impl BackgroundExecutor {
64    /// Creates a new BackgroundExecutor from the given PlatformDispatcher.
65    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
66        #[cfg(any(test, feature = "test-support"))]
67        let scheduler: Arc<dyn Scheduler> = if let Some(test_dispatcher) = dispatcher.as_test() {
68            test_dispatcher.scheduler().clone()
69        } else {
70            Arc::new(PlatformScheduler::new(dispatcher.clone()))
71        };
72
73        #[cfg(not(any(test, feature = "test-support")))]
74        let scheduler: Arc<dyn Scheduler> = Arc::new(PlatformScheduler::new(dispatcher.clone()));
75
76        Self {
77            inner: scheduler::BackgroundExecutor::new(scheduler),
78            dispatcher,
79        }
80    }
81
82    /// Returns the underlying scheduler::BackgroundExecutor.
83    ///
84    /// This is used by Ex to pass the executor to thread/worktree code.
85    pub fn scheduler_executor(&self) -> scheduler::BackgroundExecutor {
86        self.inner.clone()
87    }
88
89    /// Enqueues the given future to be run to completion on a background thread.
90    #[track_caller]
91    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
92    where
93        R: Send + 'static,
94    {
95        self.spawn_with_priority(Priority::default(), future.boxed())
96    }
97
98    /// Enqueues the given future to be run to completion on a background thread with the given priority.
99    ///
100    /// When `Priority::RealtimeAudio` is used, the task runs on a dedicated thread with
101    /// realtime scheduling priority, suitable for audio processing.
102    #[track_caller]
103    pub fn spawn_with_priority<R>(
104        &self,
105        priority: Priority,
106        future: impl Future<Output = R> + Send + 'static,
107    ) -> Task<R>
108    where
109        R: Send + 'static,
110    {
111        if priority == Priority::RealtimeAudio {
112            self.inner.spawn_realtime(future)
113        } else {
114            self.inner.spawn_with_priority(priority, future)
115        }
116    }
117
118    /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
119    ///
120    /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
121    /// completion before the current task is resumed, even if the current task is slated for cancellation.
122    pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
123    where
124        R: Send,
125    {
126        use crate::RunnableMeta;
127        use parking_lot::{Condvar, Mutex};
128
129        struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
130
131        impl Drop for NotifyOnDrop<'_> {
132            fn drop(&mut self) {
133                *self.0.1.lock() = true;
134                self.0.0.notify_all();
135            }
136        }
137
138        struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
139
140        impl Drop for WaitOnDrop<'_> {
141            fn drop(&mut self) {
142                let mut done = self.0.1.lock();
143                if !*done {
144                    self.0.0.wait(&mut done);
145                }
146            }
147        }
148
149        let dispatcher = self.dispatcher.clone();
150        let location = core::panic::Location::caller();
151
152        let pair = &(Condvar::new(), Mutex::new(false));
153        let _wait_guard = WaitOnDrop(pair);
154
155        let (runnable, task) = unsafe {
156            async_task::Builder::new()
157                .metadata(RunnableMeta { location })
158                .spawn_unchecked(
159                    move |_| async {
160                        let _notify_guard = NotifyOnDrop(pair);
161                        future.await
162                    },
163                    move |runnable| {
164                        dispatcher.dispatch(runnable, Priority::default());
165                    },
166                )
167        };
168        runnable.schedule();
169        task.await
170    }
171
172    /// Scoped lets you start a number of tasks and waits
173    /// for all of them to complete before returning.
174    pub async fn scoped<'scope, F>(&self, scheduler: F)
175    where
176        F: FnOnce(&mut Scope<'scope>),
177    {
178        let mut scope = Scope::new(self.clone(), Priority::default());
179        (scheduler)(&mut scope);
180        let spawned = mem::take(&mut scope.futures)
181            .into_iter()
182            .map(|f| self.spawn_with_priority(scope.priority, f))
183            .collect::<Vec<_>>();
184        for task in spawned {
185            task.await;
186        }
187    }
188
189    /// Scoped lets you start a number of tasks and waits
190    /// for all of them to complete before returning.
191    pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
192    where
193        F: FnOnce(&mut Scope<'scope>),
194    {
195        let mut scope = Scope::new(self.clone(), priority);
196        (scheduler)(&mut scope);
197        let spawned = mem::take(&mut scope.futures)
198            .into_iter()
199            .map(|f| self.spawn_with_priority(scope.priority, f))
200            .collect::<Vec<_>>();
201        for task in spawned {
202            task.await;
203        }
204    }
205
206    /// Get the current time.
207    ///
208    /// Calling this instead of `std::time::Instant::now` allows the use
209    /// of fake timers in tests.
210    pub fn now(&self) -> Instant {
211        self.inner.scheduler().clock().now()
212    }
213
214    /// Returns a task that will complete after the given duration.
215    /// Depending on other concurrent tasks the elapsed duration may be longer
216    /// than requested.
217    #[track_caller]
218    pub fn timer(&self, duration: Duration) -> Task<()> {
219        if duration.is_zero() {
220            return Task::ready(());
221        }
222        self.spawn(self.inner.scheduler().timer(duration))
223    }
224
225    /// In tests, run an arbitrary number of tasks (determined by the SEED environment variable)
226    #[cfg(any(test, feature = "test-support"))]
227    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
228        self.dispatcher.as_test().unwrap().simulate_random_delay()
229    }
230
231    /// In tests, move time forward. This does not run any tasks, but does make `timer`s ready.
232    #[cfg(any(test, feature = "test-support"))]
233    pub fn advance_clock(&self, duration: Duration) {
234        self.dispatcher.as_test().unwrap().advance_clock(duration)
235    }
236
237    /// In tests, run one task.
238    #[cfg(any(test, feature = "test-support"))]
239    pub fn tick(&self) -> bool {
240        self.dispatcher.as_test().unwrap().scheduler().tick()
241    }
242
243    /// In tests, run tasks until the scheduler would park.
244    ///
245    /// Under the scheduler-backed test dispatcher, `tick()` will not advance the clock, so a pending
246    /// timer can keep `has_pending_tasks()` true even after all currently-runnable tasks have been
247    /// drained. To preserve the historical semantics that tests relied on (drain all work that can
248    /// make progress), we advance the clock to the next timer when no runnable tasks remain.
249    #[cfg(any(test, feature = "test-support"))]
250    pub fn run_until_parked(&self) {
251        let scheduler = self.dispatcher.as_test().unwrap().scheduler();
252        scheduler.run();
253    }
254
255    /// In tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
256    #[cfg(any(test, feature = "test-support"))]
257    pub fn allow_parking(&self) {
258        self.dispatcher
259            .as_test()
260            .unwrap()
261            .scheduler()
262            .allow_parking();
263
264        if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") {
265            log::warn!("[gpui::executor] allow_parking: enabled");
266        }
267    }
268
269    /// Sets the range of ticks to run before timing out in block_on.
270    #[cfg(any(test, feature = "test-support"))]
271    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
272        self.dispatcher
273            .as_test()
274            .unwrap()
275            .scheduler()
276            .set_timeout_ticks(range);
277    }
278
279    /// Undoes the effect of [`Self::allow_parking`].
280    #[cfg(any(test, feature = "test-support"))]
281    pub fn forbid_parking(&self) {
282        self.dispatcher
283            .as_test()
284            .unwrap()
285            .scheduler()
286            .forbid_parking();
287    }
288
289    /// In tests, returns the rng used by the dispatcher.
290    #[cfg(any(test, feature = "test-support"))]
291    pub fn rng(&self) -> scheduler::SharedRng {
292        self.dispatcher.as_test().unwrap().scheduler().rng()
293    }
294
295    /// How many CPUs are available to the dispatcher.
296    pub fn num_cpus(&self) -> usize {
297        #[cfg(any(test, feature = "test-support"))]
298        if let Some(test) = self.dispatcher.as_test() {
299            return test.num_cpus_override().unwrap_or(4);
300        }
301        num_cpus::get()
302    }
303
304    /// Override the number of CPUs reported by this executor in tests.
305    /// Panics if not called on a test executor.
306    #[cfg(any(test, feature = "test-support"))]
307    pub fn set_num_cpus(&self, count: usize) {
308        self.dispatcher
309            .as_test()
310            .expect("set_num_cpus can only be called on a test executor")
311            .set_num_cpus(count);
312    }
313
314    /// Whether we're on the main thread.
315    pub fn is_main_thread(&self) -> bool {
316        self.dispatcher.is_main_thread()
317    }
318
319    #[doc(hidden)]
320    pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
321        &self.dispatcher
322    }
323}
324
325impl ForegroundExecutor {
326    /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
327    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
328        #[cfg(any(test, feature = "test-support"))]
329        let (scheduler, session_id): (Arc<dyn Scheduler>, _) =
330            if let Some(test_dispatcher) = dispatcher.as_test() {
331                (
332                    test_dispatcher.scheduler().clone(),
333                    test_dispatcher.session_id(),
334                )
335            } else {
336                let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
337                let session_id = platform_scheduler.allocate_session_id();
338                (platform_scheduler, session_id)
339            };
340
341        #[cfg(not(any(test, feature = "test-support")))]
342        let (scheduler, session_id): (Arc<dyn Scheduler>, _) = {
343            let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
344            let session_id = platform_scheduler.allocate_session_id();
345            (platform_scheduler, session_id)
346        };
347
348        let inner = scheduler::ForegroundExecutor::new(session_id, scheduler);
349
350        Self {
351            inner,
352            dispatcher,
353            not_send: PhantomData,
354        }
355    }
356
357    /// Enqueues the given Task to run on the main thread.
358    #[track_caller]
359    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
360    where
361        R: 'static,
362    {
363        self.inner.spawn(future.boxed_local())
364    }
365
366    /// Enqueues the given Task to run on the main thread with the given priority.
367    #[track_caller]
368    pub fn spawn_with_priority<R>(
369        &self,
370        _priority: Priority,
371        future: impl Future<Output = R> + 'static,
372    ) -> Task<R>
373    where
374        R: 'static,
375    {
376        // Priority is ignored for foreground tasks - they run in order on the main thread
377        self.inner.spawn(future)
378    }
379
380    /// Used by the test harness to run an async test in a synchronous fashion.
381    #[cfg(any(test, feature = "test-support"))]
382    #[track_caller]
383    pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
384        use std::cell::Cell;
385
386        let scheduler = self.inner.scheduler();
387
388        let output = Cell::new(None);
389        let future = async {
390            output.set(Some(future.await));
391        };
392        let mut future = std::pin::pin!(future);
393
394        // In async GPUI tests, we must allow foreground tasks scheduled by the test itself
395        // (which are associated with the test session) to make progress while we block.
396        // Otherwise, awaiting futures that depend on same-session foreground work can deadlock.
397        scheduler.block(None, future.as_mut(), None);
398
399        output.take().expect("block_test future did not complete")
400    }
401
402    /// Block the current thread until the given future resolves.
403    /// Consider using `block_with_timeout` instead.
404    pub fn block_on<R>(&self, future: impl Future<Output = R>) -> R {
405        self.inner.block_on(future)
406    }
407
408    /// Block the current thread until the given future resolves or the timeout elapses.
409    pub fn block_with_timeout<R, Fut: Future<Output = R>>(
410        &self,
411        duration: Duration,
412        future: Fut,
413    ) -> Result<R, impl Future<Output = R> + use<R, Fut>> {
414        self.inner.block_with_timeout(duration, future)
415    }
416
417    #[doc(hidden)]
418    pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
419        &self.dispatcher
420    }
421
422    #[doc(hidden)]
423    pub fn scheduler_executor(&self) -> SchedulerForegroundExecutor {
424        self.inner.clone()
425    }
426}
427
428/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
429pub struct Scope<'a> {
430    executor: BackgroundExecutor,
431    priority: Priority,
432    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
433    tx: Option<mpsc::Sender<()>>,
434    rx: mpsc::Receiver<()>,
435    lifetime: PhantomData<&'a ()>,
436}
437
438impl<'a> Scope<'a> {
439    fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
440        let (tx, rx) = mpsc::channel(1);
441        Self {
442            executor,
443            priority,
444            tx: Some(tx),
445            rx,
446            futures: Default::default(),
447            lifetime: PhantomData,
448        }
449    }
450
451    /// How many CPUs are available to the dispatcher.
452    pub fn num_cpus(&self) -> usize {
453        self.executor.num_cpus()
454    }
455
456    /// Spawn a future into this scope.
457    #[track_caller]
458    pub fn spawn<F>(&mut self, f: F)
459    where
460        F: Future<Output = ()> + Send + 'a,
461    {
462        let tx = self.tx.clone().unwrap();
463
464        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
465        // dropping this `Scope` blocks until all of the futures have resolved.
466        let f = unsafe {
467            mem::transmute::<
468                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
469                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
470            >(Box::pin(async move {
471                f.await;
472                drop(tx);
473            }))
474        };
475        self.futures.push(f);
476    }
477}
478
479impl Drop for Scope<'_> {
480    fn drop(&mut self) {
481        self.tx.take().unwrap();
482
483        // Wait until the channel is closed, which means that all of the spawned
484        // futures have resolved.
485        let future = async {
486            self.rx.next().await;
487        };
488        let mut future = std::pin::pin!(future);
489        self.executor
490            .inner
491            .scheduler()
492            .block(None, future.as_mut(), None);
493    }
494}
495
496#[cfg(test)]
497mod test {
498    use super::*;
499    use crate::{App, TestDispatcher, TestPlatform};
500    use std::cell::RefCell;
501
502    /// Helper to create test infrastructure.
503    /// Returns (dispatcher, background_executor, app).
504    fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
505        let dispatcher = TestDispatcher::new(0);
506        let arc_dispatcher = Arc::new(dispatcher.clone());
507        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
508        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
509
510        let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
511        let asset_source = Arc::new(());
512        let http_client = http_client::FakeHttpClient::with_404_response();
513
514        let app = App::new_app(platform, asset_source, http_client);
515        (dispatcher, background_executor, app)
516    }
517
518    #[test]
519    fn sanity_test_tasks_run() {
520        let (dispatcher, _background_executor, app) = create_test_app();
521        let foreground_executor = app.borrow().foreground_executor.clone();
522
523        let task_ran = Rc::new(RefCell::new(false));
524
525        foreground_executor
526            .spawn({
527                let task_ran = Rc::clone(&task_ran);
528                async move {
529                    *task_ran.borrow_mut() = true;
530                }
531            })
532            .detach();
533
534        // Run dispatcher while app is still alive
535        dispatcher.run_until_parked();
536
537        // Task should have run
538        assert!(
539            *task_ran.borrow(),
540            "Task should run normally when app is alive"
541        );
542    }
543}