Skip to main content

open_gpui/
executor.rs

1use crate::{App, PlatformDispatcher, PlatformScheduler};
2use futures::channel::mpsc;
3use futures::prelude::*;
4use open_gpui_core_util::{TryFutureExt, TryFutureExtBacktrace};
5use open_gpui_scheduler::Instant;
6use open_gpui_scheduler::Scheduler;
7use std::{future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc, sync::Arc, time::Duration};
8
9pub use open_gpui_scheduler::{
10    FallibleTask, LocalExecutor as SchedulerLocalExecutor, Priority, Task,
11};
12
13/// A pointer to the executor that is currently running,
14/// for spawning background tasks.
15#[derive(Clone)]
16pub struct BackgroundExecutor {
17    inner: open_gpui_scheduler::BackgroundExecutor,
18    dispatcher: Arc<dyn PlatformDispatcher>,
19}
20
21/// A pointer to the executor that is currently running,
22/// for spawning tasks on the main thread.
23#[derive(Clone)]
24pub struct ForegroundExecutor {
25    inner: open_gpui_scheduler::LocalExecutor,
26    dispatcher: Arc<dyn PlatformDispatcher>,
27    not_send: PhantomData<Rc<()>>,
28}
29
30/// Extension trait for `Task<Result<T, E>>` that adds `detach_and_log_err` with an `&App` context.
31///
32/// This trait is automatically implemented for all `Task<Result<T, E>>` types.
33pub trait TaskExt<T, E> {
34    /// Run the task to completion in the background and log any errors that occur.
35    fn detach_and_log_err(self, cx: &App);
36    /// Like [`Self::detach_and_log_err`], but uses `{:?}` formatting on failure so `anyhow::Error`
37    /// values emit their full backtrace. Prefer `detach_and_log_err` unless a backtrace is wanted.
38    fn detach_and_log_err_with_backtrace(self, cx: &App);
39}
40
41impl<T, E> TaskExt<T, E> for Task<Result<T, E>>
42where
43    T: 'static,
44    E: 'static + std::fmt::Display + std::fmt::Debug,
45{
46    #[track_caller]
47    fn detach_and_log_err(self, cx: &App) {
48        let location = core::panic::Location::caller();
49        cx.foreground_executor()
50            .spawn(self.log_tracked_err(*location))
51            .detach();
52    }
53
54    #[track_caller]
55    fn detach_and_log_err_with_backtrace(self, cx: &App) {
56        let location = *core::panic::Location::caller();
57        cx.foreground_executor()
58            .spawn(self.log_tracked_err_with_backtrace(location))
59            .detach();
60    }
61}
62
63impl BackgroundExecutor {
64    /// Creates a new BackgroundExecutor from the given PlatformDispatcher.
65    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
66        #[cfg(any(test, feature = "test-support"))]
67        let scheduler: Arc<dyn Scheduler> = if let Some(test_dispatcher) = dispatcher.as_test() {
68            test_dispatcher.scheduler().clone()
69        } else {
70            Arc::new(PlatformScheduler::new(dispatcher.clone()))
71        };
72
73        #[cfg(not(any(test, feature = "test-support")))]
74        let scheduler: Arc<dyn Scheduler> = Arc::new(PlatformScheduler::new(dispatcher.clone()));
75
76        Self {
77            inner: open_gpui_scheduler::BackgroundExecutor::new(scheduler),
78            dispatcher,
79        }
80    }
81
82    /// Returns the underlying open_gpui_scheduler::BackgroundExecutor.
83    ///
84    /// This is used by Ex to pass the executor to thread/worktree code.
85    pub fn scheduler_executor(&self) -> open_gpui_scheduler::BackgroundExecutor {
86        self.inner.clone()
87    }
88
89    /// Enqueues the given future to be run to completion on a background thread.
90    #[track_caller]
91    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
92    where
93        R: Send + 'static,
94    {
95        self.spawn_with_priority(Priority::default(), future.boxed())
96    }
97
98    /// Enqueues the given future to be run to completion on a background thread with the given priority.
99    ///
100    /// When `Priority::RealtimeAudio` is used, the task runs on a dedicated thread with
101    /// realtime scheduling priority, suitable for audio processing.
102    #[track_caller]
103    pub fn spawn_with_priority<R>(
104        &self,
105        priority: Priority,
106        future: impl Future<Output = R> + Send + 'static,
107    ) -> Task<R>
108    where
109        R: Send + 'static,
110    {
111        if priority == Priority::RealtimeAudio {
112            self.inner.spawn_realtime(future)
113        } else {
114            self.inner.spawn_with_priority(priority, future)
115        }
116    }
117
118    /// Scoped lets you start a number of tasks and waits
119    /// for all of them to complete before returning.
120    pub async fn scoped<'scope, F>(&self, scheduler: F)
121    where
122        F: FnOnce(&mut Scope<'scope>),
123    {
124        let mut scope = Scope::new(self.clone(), Priority::default());
125        (scheduler)(&mut scope);
126        let spawned = mem::take(&mut scope.futures)
127            .into_iter()
128            .map(|f| self.spawn_with_priority(scope.priority, f))
129            .collect::<Vec<_>>();
130        for task in spawned {
131            task.await;
132        }
133    }
134
135    /// Scoped lets you start a number of tasks and waits
136    /// for all of them to complete before returning.
137    pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
138    where
139        F: FnOnce(&mut Scope<'scope>),
140    {
141        let mut scope = Scope::new(self.clone(), priority);
142        (scheduler)(&mut scope);
143        let spawned = mem::take(&mut scope.futures)
144            .into_iter()
145            .map(|f| self.spawn_with_priority(scope.priority, f))
146            .collect::<Vec<_>>();
147        for task in spawned {
148            task.await;
149        }
150    }
151
152    /// Get the current time.
153    ///
154    /// Calling this instead of `std::time::Instant::now` allows the use
155    /// of fake timers in tests.
156    pub fn now(&self) -> Instant {
157        self.inner.scheduler().clock().now()
158    }
159
160    /// Returns a task that will complete after the given duration.
161    /// Depending on other concurrent tasks the elapsed duration may be longer
162    /// than requested.
163    #[track_caller]
164    pub fn timer(&self, duration: Duration) -> Task<()> {
165        if duration.is_zero() {
166            return Task::ready(());
167        }
168        self.spawn(self.inner.scheduler().timer(duration))
169    }
170
171    /// In tests, run an arbitrary number of tasks (determined by the SEED environment variable)
172    #[cfg(any(test, feature = "test-support"))]
173    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
174        self.dispatcher.as_test().unwrap().simulate_random_delay()
175    }
176
177    /// In tests, move time forward. This does not run any tasks, but does make `timer`s ready.
178    #[cfg(any(test, feature = "test-support"))]
179    pub fn advance_clock(&self, duration: Duration) {
180        self.dispatcher.as_test().unwrap().advance_clock(duration)
181    }
182
183    /// In tests, run one task.
184    #[cfg(any(test, feature = "test-support"))]
185    pub fn tick(&self) -> bool {
186        self.dispatcher.as_test().unwrap().scheduler().tick()
187    }
188
189    /// In tests, run tasks until the scheduler would park.
190    ///
191    /// Under the scheduler-backed test dispatcher, `tick()` will not advance the clock, so a pending
192    /// timer can keep `has_pending_tasks()` true even after all currently-runnable tasks have been
193    /// drained. To preserve the historical semantics that tests relied on (drain all work that can
194    /// make progress), we advance the clock to the next timer when no runnable tasks remain.
195    #[cfg(any(test, feature = "test-support"))]
196    pub fn run_until_parked(&self) {
197        let scheduler = self.dispatcher.as_test().unwrap().scheduler();
198        scheduler.run();
199    }
200
201    /// In tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
202    #[cfg(any(test, feature = "test-support"))]
203    pub fn allow_parking(&self) {
204        self.dispatcher
205            .as_test()
206            .unwrap()
207            .scheduler()
208            .allow_parking();
209
210        if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") {
211            log::warn!("[open_gpui::executor] allow_parking: enabled");
212        }
213    }
214
215    /// Sets the range of ticks to run before timing out in block_on.
216    #[cfg(any(test, feature = "test-support"))]
217    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
218        self.dispatcher
219            .as_test()
220            .unwrap()
221            .scheduler()
222            .set_timeout_ticks(range);
223    }
224
225    /// Undoes the effect of [`Self::allow_parking`].
226    #[cfg(any(test, feature = "test-support"))]
227    pub fn forbid_parking(&self) {
228        self.dispatcher
229            .as_test()
230            .unwrap()
231            .scheduler()
232            .forbid_parking();
233    }
234
235    /// In tests, returns the rng used by the dispatcher.
236    #[cfg(any(test, feature = "test-support"))]
237    pub fn rng(&self) -> open_gpui_scheduler::SharedRng {
238        self.dispatcher.as_test().unwrap().scheduler().rng()
239    }
240
241    /// How many CPUs are available to the dispatcher.
242    pub fn num_cpus(&self) -> usize {
243        #[cfg(any(test, feature = "test-support"))]
244        if let Some(test) = self.dispatcher.as_test() {
245            return test.num_cpus_override().unwrap_or(4);
246        }
247        num_cpus::get()
248    }
249
250    /// Override the number of CPUs reported by this executor in tests.
251    /// Panics if not called on a test executor.
252    #[cfg(any(test, feature = "test-support"))]
253    pub fn set_num_cpus(&self, count: usize) {
254        self.dispatcher
255            .as_test()
256            .expect("set_num_cpus can only be called on a test executor")
257            .set_num_cpus(count);
258    }
259
260    /// Whether we're on the main thread.
261    pub fn is_main_thread(&self) -> bool {
262        self.dispatcher.is_main_thread()
263    }
264
265    #[doc(hidden)]
266    pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
267        &self.dispatcher
268    }
269}
270
271impl ForegroundExecutor {
272    /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
273    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
274        #[cfg(any(test, feature = "test-support"))]
275        let (scheduler, session_id): (Arc<dyn Scheduler>, _) =
276            if let Some(test_dispatcher) = dispatcher.as_test() {
277                (
278                    test_dispatcher.scheduler().clone(),
279                    test_dispatcher.session_id(),
280                )
281            } else {
282                let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
283                let inner = platform_scheduler.foreground_executor();
284                return Self {
285                    inner,
286                    dispatcher,
287                    not_send: PhantomData,
288                };
289            };
290
291        #[cfg(not(any(test, feature = "test-support")))]
292        let inner = {
293            let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
294            platform_scheduler.foreground_executor()
295        };
296
297        #[cfg(any(test, feature = "test-support"))]
298        let inner = {
299            let scheduler_for_dispatch = Arc::downgrade(&scheduler);
300            open_gpui_scheduler::LocalExecutor::new(session_id, scheduler, move |runnable| {
301                if let Some(scheduler) = scheduler_for_dispatch.upgrade() {
302                    scheduler.schedule_local(session_id, runnable);
303                }
304            })
305        };
306
307        Self {
308            inner,
309            dispatcher,
310            not_send: PhantomData,
311        }
312    }
313
314    /// Enqueues the given Task to run on the main thread.
315    #[track_caller]
316    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
317    where
318        R: 'static,
319    {
320        self.inner.spawn(future.boxed_local())
321    }
322
323    /// Enqueues the given Task to run on the main thread with the given priority.
324    #[track_caller]
325    pub fn spawn_with_priority<R>(
326        &self,
327        _priority: Priority,
328        future: impl Future<Output = R> + 'static,
329    ) -> Task<R>
330    where
331        R: 'static,
332    {
333        // Priority is ignored for foreground tasks - they run in order on the main thread
334        self.inner.spawn(future)
335    }
336
337    /// Used by the test harness to run an async test in a synchronous fashion.
338    #[cfg(any(test, feature = "test-support"))]
339    #[track_caller]
340    pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
341        use std::cell::Cell;
342
343        let scheduler = self.inner.scheduler();
344
345        let output = Cell::new(None);
346        let future = async {
347            output.set(Some(future.await));
348        };
349        let mut future = std::pin::pin!(future);
350
351        // In async GPUI tests, we must allow foreground tasks scheduled by the test itself
352        // (which are associated with the test session) to make progress while we block.
353        // Otherwise, awaiting futures that depend on same-session foreground work can deadlock.
354        scheduler.block(None, future.as_mut(), None);
355
356        output.take().expect("block_test future did not complete")
357    }
358
359    /// Block the current thread until the given future resolves.
360    /// Consider using `block_with_timeout` instead.
361    pub fn block_on<R>(&self, future: impl Future<Output = R>) -> R {
362        self.inner.block_on(future)
363    }
364
365    /// Block the current thread until the given future resolves or the timeout elapses.
366    pub fn block_with_timeout<R, Fut: Future<Output = R>>(
367        &self,
368        duration: Duration,
369        future: Fut,
370    ) -> Result<R, impl Future<Output = R> + use<R, Fut>> {
371        self.inner.block_with_timeout(duration, future)
372    }
373
374    #[doc(hidden)]
375    pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
376        &self.dispatcher
377    }
378
379    #[doc(hidden)]
380    pub fn scheduler_executor(&self) -> SchedulerLocalExecutor {
381        self.inner.clone()
382    }
383}
384
385/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
386pub struct Scope<'a> {
387    executor: BackgroundExecutor,
388    priority: Priority,
389    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
390    tx: Option<mpsc::Sender<()>>,
391    rx: mpsc::Receiver<()>,
392    lifetime: PhantomData<&'a ()>,
393}
394
395impl<'a> Scope<'a> {
396    fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
397        let (tx, rx) = mpsc::channel(1);
398        Self {
399            executor,
400            priority,
401            tx: Some(tx),
402            rx,
403            futures: Default::default(),
404            lifetime: PhantomData,
405        }
406    }
407
408    /// How many CPUs are available to the dispatcher.
409    pub fn num_cpus(&self) -> usize {
410        self.executor.num_cpus()
411    }
412
413    /// Spawn a future into this scope.
414    #[track_caller]
415    pub fn spawn<F>(&mut self, f: F)
416    where
417        F: Future<Output = ()> + Send + 'a,
418    {
419        let tx = self.tx.clone().unwrap();
420
421        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
422        // dropping this `Scope` blocks until all of the futures have resolved.
423        let f = unsafe {
424            mem::transmute::<
425                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
426                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
427            >(Box::pin(async move {
428                f.await;
429                drop(tx);
430            }))
431        };
432        self.futures.push(f);
433    }
434}
435
436impl Drop for Scope<'_> {
437    fn drop(&mut self) {
438        self.tx.take().unwrap();
439
440        // Wait until the channel is closed, which means that all of the spawned
441        // futures have resolved.
442        let future = async {
443            self.rx.next().await;
444        };
445        let mut future = std::pin::pin!(future);
446        self.executor
447            .inner
448            .scheduler()
449            .block(None, future.as_mut(), None);
450    }
451}
452
453#[cfg(test)]
454mod test {
455    use super::*;
456    use crate::{App, TestDispatcher, TestPlatform};
457    use std::cell::RefCell;
458
459    /// Helper to create test infrastructure.
460    /// Returns (dispatcher, background_executor, app).
461    fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
462        let dispatcher = TestDispatcher::new(0);
463        let arc_dispatcher = Arc::new(dispatcher.clone());
464        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
465        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
466
467        let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
468        let asset_source = Arc::new(());
469        let http_client = open_gpui_http_client::FakeHttpClient::with_404_response();
470
471        let app = App::new_app(platform, asset_source, http_client);
472        (dispatcher, background_executor, app)
473    }
474
475    #[test]
476    fn sanity_test_tasks_run() {
477        let (dispatcher, _background_executor, app) = create_test_app();
478        let foreground_executor = app.borrow().foreground_executor.clone();
479
480        let task_ran = Rc::new(RefCell::new(false));
481
482        foreground_executor
483            .spawn({
484                let task_ran = Rc::clone(&task_ran);
485                async move {
486                    *task_ran.borrow_mut() = true;
487                }
488            })
489            .detach();
490
491        // Run dispatcher while app is still alive
492        dispatcher.run_until_parked();
493
494        // Task should have run
495        assert!(
496            *task_ran.borrow(),
497            "Task should run normally when app is alive"
498        );
499    }
500}