1use crate::{App, PlatformDispatcher, PlatformScheduler};
2use futures::channel::mpsc;
3use futures::prelude::*;
4use open_gpui_core_util::{TryFutureExt, TryFutureExtBacktrace};
5use open_gpui_scheduler::Instant;
6use open_gpui_scheduler::Scheduler;
7use std::{future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc, sync::Arc, time::Duration};
8
9pub use open_gpui_scheduler::{
10 FallibleTask, LocalExecutor as SchedulerLocalExecutor, Priority, Task,
11};
12
13#[derive(Clone)]
16pub struct BackgroundExecutor {
17 inner: open_gpui_scheduler::BackgroundExecutor,
18 dispatcher: Arc<dyn PlatformDispatcher>,
19}
20
21#[derive(Clone)]
24pub struct ForegroundExecutor {
25 inner: open_gpui_scheduler::LocalExecutor,
26 dispatcher: Arc<dyn PlatformDispatcher>,
27 not_send: PhantomData<Rc<()>>,
28}
29
30pub trait TaskExt<T, E> {
34 fn detach_and_log_err(self, cx: &App);
36 fn detach_and_log_err_with_backtrace(self, cx: &App);
39}
40
41impl<T, E> TaskExt<T, E> for Task<Result<T, E>>
42where
43 T: 'static,
44 E: 'static + std::fmt::Display + std::fmt::Debug,
45{
46 #[track_caller]
47 fn detach_and_log_err(self, cx: &App) {
48 let location = core::panic::Location::caller();
49 cx.foreground_executor()
50 .spawn(self.log_tracked_err(*location))
51 .detach();
52 }
53
54 #[track_caller]
55 fn detach_and_log_err_with_backtrace(self, cx: &App) {
56 let location = *core::panic::Location::caller();
57 cx.foreground_executor()
58 .spawn(self.log_tracked_err_with_backtrace(location))
59 .detach();
60 }
61}
62
63impl BackgroundExecutor {
64 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
66 #[cfg(any(test, feature = "test-support"))]
67 let scheduler: Arc<dyn Scheduler> = if let Some(test_dispatcher) = dispatcher.as_test() {
68 test_dispatcher.scheduler().clone()
69 } else {
70 Arc::new(PlatformScheduler::new(dispatcher.clone()))
71 };
72
73 #[cfg(not(any(test, feature = "test-support")))]
74 let scheduler: Arc<dyn Scheduler> = Arc::new(PlatformScheduler::new(dispatcher.clone()));
75
76 Self {
77 inner: open_gpui_scheduler::BackgroundExecutor::new(scheduler),
78 dispatcher,
79 }
80 }
81
82 pub fn scheduler_executor(&self) -> open_gpui_scheduler::BackgroundExecutor {
86 self.inner.clone()
87 }
88
89 #[track_caller]
91 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
92 where
93 R: Send + 'static,
94 {
95 self.spawn_with_priority(Priority::default(), future.boxed())
96 }
97
98 #[track_caller]
103 pub fn spawn_with_priority<R>(
104 &self,
105 priority: Priority,
106 future: impl Future<Output = R> + Send + 'static,
107 ) -> Task<R>
108 where
109 R: Send + 'static,
110 {
111 if priority == Priority::RealtimeAudio {
112 self.inner.spawn_realtime(future)
113 } else {
114 self.inner.spawn_with_priority(priority, future)
115 }
116 }
117
118 pub async fn scoped<'scope, F>(&self, scheduler: F)
121 where
122 F: FnOnce(&mut Scope<'scope>),
123 {
124 let mut scope = Scope::new(self.clone(), Priority::default());
125 (scheduler)(&mut scope);
126 let spawned = mem::take(&mut scope.futures)
127 .into_iter()
128 .map(|f| self.spawn_with_priority(scope.priority, f))
129 .collect::<Vec<_>>();
130 for task in spawned {
131 task.await;
132 }
133 }
134
135 pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
138 where
139 F: FnOnce(&mut Scope<'scope>),
140 {
141 let mut scope = Scope::new(self.clone(), priority);
142 (scheduler)(&mut scope);
143 let spawned = mem::take(&mut scope.futures)
144 .into_iter()
145 .map(|f| self.spawn_with_priority(scope.priority, f))
146 .collect::<Vec<_>>();
147 for task in spawned {
148 task.await;
149 }
150 }
151
152 pub fn now(&self) -> Instant {
157 self.inner.scheduler().clock().now()
158 }
159
160 #[track_caller]
164 pub fn timer(&self, duration: Duration) -> Task<()> {
165 if duration.is_zero() {
166 return Task::ready(());
167 }
168 self.spawn(self.inner.scheduler().timer(duration))
169 }
170
171 #[cfg(any(test, feature = "test-support"))]
173 pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
174 self.dispatcher.as_test().unwrap().simulate_random_delay()
175 }
176
177 #[cfg(any(test, feature = "test-support"))]
179 pub fn advance_clock(&self, duration: Duration) {
180 self.dispatcher.as_test().unwrap().advance_clock(duration)
181 }
182
183 #[cfg(any(test, feature = "test-support"))]
185 pub fn tick(&self) -> bool {
186 self.dispatcher.as_test().unwrap().scheduler().tick()
187 }
188
189 #[cfg(any(test, feature = "test-support"))]
196 pub fn run_until_parked(&self) {
197 let scheduler = self.dispatcher.as_test().unwrap().scheduler();
198 scheduler.run();
199 }
200
201 #[cfg(any(test, feature = "test-support"))]
203 pub fn allow_parking(&self) {
204 self.dispatcher
205 .as_test()
206 .unwrap()
207 .scheduler()
208 .allow_parking();
209
210 if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") {
211 log::warn!("[open_gpui::executor] allow_parking: enabled");
212 }
213 }
214
215 #[cfg(any(test, feature = "test-support"))]
217 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
218 self.dispatcher
219 .as_test()
220 .unwrap()
221 .scheduler()
222 .set_timeout_ticks(range);
223 }
224
225 #[cfg(any(test, feature = "test-support"))]
227 pub fn forbid_parking(&self) {
228 self.dispatcher
229 .as_test()
230 .unwrap()
231 .scheduler()
232 .forbid_parking();
233 }
234
235 #[cfg(any(test, feature = "test-support"))]
237 pub fn rng(&self) -> open_gpui_scheduler::SharedRng {
238 self.dispatcher.as_test().unwrap().scheduler().rng()
239 }
240
241 pub fn num_cpus(&self) -> usize {
243 #[cfg(any(test, feature = "test-support"))]
244 if let Some(test) = self.dispatcher.as_test() {
245 return test.num_cpus_override().unwrap_or(4);
246 }
247 num_cpus::get()
248 }
249
250 #[cfg(any(test, feature = "test-support"))]
253 pub fn set_num_cpus(&self, count: usize) {
254 self.dispatcher
255 .as_test()
256 .expect("set_num_cpus can only be called on a test executor")
257 .set_num_cpus(count);
258 }
259
260 pub fn is_main_thread(&self) -> bool {
262 self.dispatcher.is_main_thread()
263 }
264
265 #[doc(hidden)]
266 pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
267 &self.dispatcher
268 }
269}
270
271impl ForegroundExecutor {
272 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
274 #[cfg(any(test, feature = "test-support"))]
275 let (scheduler, session_id): (Arc<dyn Scheduler>, _) =
276 if let Some(test_dispatcher) = dispatcher.as_test() {
277 (
278 test_dispatcher.scheduler().clone(),
279 test_dispatcher.session_id(),
280 )
281 } else {
282 let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
283 let inner = platform_scheduler.foreground_executor();
284 return Self {
285 inner,
286 dispatcher,
287 not_send: PhantomData,
288 };
289 };
290
291 #[cfg(not(any(test, feature = "test-support")))]
292 let inner = {
293 let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
294 platform_scheduler.foreground_executor()
295 };
296
297 #[cfg(any(test, feature = "test-support"))]
298 let inner = {
299 let scheduler_for_dispatch = Arc::downgrade(&scheduler);
300 open_gpui_scheduler::LocalExecutor::new(session_id, scheduler, move |runnable| {
301 if let Some(scheduler) = scheduler_for_dispatch.upgrade() {
302 scheduler.schedule_local(session_id, runnable);
303 }
304 })
305 };
306
307 Self {
308 inner,
309 dispatcher,
310 not_send: PhantomData,
311 }
312 }
313
314 #[track_caller]
316 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
317 where
318 R: 'static,
319 {
320 self.inner.spawn(future.boxed_local())
321 }
322
323 #[track_caller]
325 pub fn spawn_with_priority<R>(
326 &self,
327 _priority: Priority,
328 future: impl Future<Output = R> + 'static,
329 ) -> Task<R>
330 where
331 R: 'static,
332 {
333 self.inner.spawn(future)
335 }
336
337 #[cfg(any(test, feature = "test-support"))]
339 #[track_caller]
340 pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
341 use std::cell::Cell;
342
343 let scheduler = self.inner.scheduler();
344
345 let output = Cell::new(None);
346 let future = async {
347 output.set(Some(future.await));
348 };
349 let mut future = std::pin::pin!(future);
350
351 scheduler.block(None, future.as_mut(), None);
355
356 output.take().expect("block_test future did not complete")
357 }
358
359 pub fn block_on<R>(&self, future: impl Future<Output = R>) -> R {
362 self.inner.block_on(future)
363 }
364
365 pub fn block_with_timeout<R, Fut: Future<Output = R>>(
367 &self,
368 duration: Duration,
369 future: Fut,
370 ) -> Result<R, impl Future<Output = R> + use<R, Fut>> {
371 self.inner.block_with_timeout(duration, future)
372 }
373
374 #[doc(hidden)]
375 pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
376 &self.dispatcher
377 }
378
379 #[doc(hidden)]
380 pub fn scheduler_executor(&self) -> SchedulerLocalExecutor {
381 self.inner.clone()
382 }
383}
384
385pub struct Scope<'a> {
387 executor: BackgroundExecutor,
388 priority: Priority,
389 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
390 tx: Option<mpsc::Sender<()>>,
391 rx: mpsc::Receiver<()>,
392 lifetime: PhantomData<&'a ()>,
393}
394
395impl<'a> Scope<'a> {
396 fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
397 let (tx, rx) = mpsc::channel(1);
398 Self {
399 executor,
400 priority,
401 tx: Some(tx),
402 rx,
403 futures: Default::default(),
404 lifetime: PhantomData,
405 }
406 }
407
408 pub fn num_cpus(&self) -> usize {
410 self.executor.num_cpus()
411 }
412
413 #[track_caller]
415 pub fn spawn<F>(&mut self, f: F)
416 where
417 F: Future<Output = ()> + Send + 'a,
418 {
419 let tx = self.tx.clone().unwrap();
420
421 let f = unsafe {
424 mem::transmute::<
425 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
426 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
427 >(Box::pin(async move {
428 f.await;
429 drop(tx);
430 }))
431 };
432 self.futures.push(f);
433 }
434}
435
436impl Drop for Scope<'_> {
437 fn drop(&mut self) {
438 self.tx.take().unwrap();
439
440 let future = async {
443 self.rx.next().await;
444 };
445 let mut future = std::pin::pin!(future);
446 self.executor
447 .inner
448 .scheduler()
449 .block(None, future.as_mut(), None);
450 }
451}
452
453#[cfg(test)]
454mod test {
455 use super::*;
456 use crate::{App, TestDispatcher, TestPlatform};
457 use std::cell::RefCell;
458
459 fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
462 let dispatcher = TestDispatcher::new(0);
463 let arc_dispatcher = Arc::new(dispatcher.clone());
464 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
465 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
466
467 let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
468 let asset_source = Arc::new(());
469 let http_client = open_gpui_http_client::FakeHttpClient::with_404_response();
470
471 let app = App::new_app(platform, asset_source, http_client);
472 (dispatcher, background_executor, app)
473 }
474
475 #[test]
476 fn sanity_test_tasks_run() {
477 let (dispatcher, _background_executor, app) = create_test_app();
478 let foreground_executor = app.borrow().foreground_executor.clone();
479
480 let task_ran = Rc::new(RefCell::new(false));
481
482 foreground_executor
483 .spawn({
484 let task_ran = Rc::clone(&task_ran);
485 async move {
486 *task_ran.borrow_mut() = true;
487 }
488 })
489 .detach();
490
491 dispatcher.run_until_parked();
493
494 assert!(
496 *task_ran.borrow(),
497 "Task should run normally when app is alive"
498 );
499 }
500}