use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant, TaskTiming, profiler};
use async_task::Runnable;
use futures::channel::mpsc;
use parking_lot::{Condvar, Mutex};
use smol::prelude::*;
use std::{
fmt::Debug,
marker::PhantomData,
mem::{self, ManuallyDrop},
num::NonZeroUsize,
panic::Location,
pin::Pin,
rc::Rc,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
task::{Context, Poll},
thread::{self, ThreadId},
time::{Duration, Instant},
};
use util::TryFutureExt as _;
use waker_fn::waker_fn;
#[cfg(any(test, feature = "test-support"))]
use rand::rngs::StdRng;
#[derive(Clone)]
pub struct BackgroundExecutor {
#[doc(hidden)]
pub dispatcher: Arc<dyn PlatformDispatcher>,
}
#[derive(Clone)]
pub struct ForegroundExecutor {
#[doc(hidden)]
pub dispatcher: Arc<dyn PlatformDispatcher>,
liveness: std::sync::Weak<()>,
not_send: PhantomData<Rc<()>>,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
#[repr(u8)]
pub enum RealtimePriority {
Audio,
#[default]
Other,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
#[repr(u8)]
pub enum Priority {
Realtime(RealtimePriority),
High,
#[default]
Medium,
Low,
}
impl Priority {
#[allow(dead_code)]
pub(crate) const fn probability(&self) -> u32 {
match self {
Priority::Realtime(_) => 0,
Priority::High => 60,
Priority::Medium => 30,
Priority::Low => 10,
}
}
}
#[must_use]
#[derive(Debug)]
pub struct Task<T>(TaskState<T>);
#[derive(Debug)]
enum TaskState<T> {
Ready(Option<T>),
Spawned(async_task::Task<T, RunnableMeta>),
}
impl<T> Task<T> {
pub fn ready(val: T) -> Self {
Task(TaskState::Ready(Some(val)))
}
pub fn detach(self) {
match self {
Task(TaskState::Ready(_)) => {}
Task(TaskState::Spawned(task)) => task.detach(),
}
}
pub fn fallible(self) -> FallibleTask<T> {
FallibleTask(match self.0 {
TaskState::Ready(val) => FallibleTaskState::Ready(val),
TaskState::Spawned(task) => FallibleTaskState::Spawned(task.fallible()),
})
}
}
impl<E, T> Task<Result<T, E>>
where
T: 'static,
E: 'static + Debug,
{
#[track_caller]
pub fn detach_and_log_err(self, cx: &App) {
let location = core::panic::Location::caller();
cx.foreground_executor()
.spawn(self.log_tracked_err(*location))
.detach();
}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match unsafe { self.get_unchecked_mut() } {
Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
Task(TaskState::Spawned(task)) => task.poll(cx),
}
}
}
#[must_use]
pub struct FallibleTask<T>(FallibleTaskState<T>);
enum FallibleTaskState<T> {
Ready(Option<T>),
Spawned(async_task::FallibleTask<T, RunnableMeta>),
}
impl<T> FallibleTask<T> {
pub fn ready(val: T) -> Self {
FallibleTask(FallibleTaskState::Ready(Some(val)))
}
pub fn detach(self) {
match self.0 {
FallibleTaskState::Ready(_) => {}
FallibleTaskState::Spawned(task) => task.detach(),
}
}
}
impl<T> Future for FallibleTask<T> {
type Output = Option<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match unsafe { self.get_unchecked_mut() } {
FallibleTask(FallibleTaskState::Ready(val)) => Poll::Ready(val.take()),
FallibleTask(FallibleTaskState::Spawned(task)) => Pin::new(task).poll(cx),
}
}
}
impl<T> std::fmt::Debug for FallibleTask<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.0 {
FallibleTaskState::Ready(_) => f.debug_tuple("FallibleTask::Ready").finish(),
FallibleTaskState::Spawned(task) => {
f.debug_tuple("FallibleTask::Spawned").field(task).finish()
}
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct TaskLabel(NonZeroUsize);
impl Default for TaskLabel {
fn default() -> Self {
Self::new()
}
}
impl TaskLabel {
pub fn new() -> Self {
static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
Self(
NEXT_TASK_LABEL
.fetch_add(1, Ordering::SeqCst)
.try_into()
.unwrap(),
)
}
}
type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
impl BackgroundExecutor {
#[doc(hidden)]
pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
Self { dispatcher }
}
#[track_caller]
pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
where
R: Send + 'static,
{
self.spawn_with_priority(Priority::default(), future)
}
#[track_caller]
pub fn spawn_with_priority<R>(
&self,
priority: Priority,
future: impl Future<Output = R> + Send + 'static,
) -> Task<R>
where
R: Send + 'static,
{
self.spawn_internal::<R>(Box::pin(future), None, priority)
}
pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
where
R: Send,
{
struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
impl Drop for NotifyOnDrop<'_> {
fn drop(&mut self) {
*self.0.1.lock() = true;
self.0.0.notify_all();
}
}
struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
impl Drop for WaitOnDrop<'_> {
fn drop(&mut self) {
let mut done = self.0.1.lock();
if !*done {
self.0.0.wait(&mut done);
}
}
}
let dispatcher = self.dispatcher.clone();
let location = core::panic::Location::caller();
let pair = &(Condvar::new(), Mutex::new(false));
let _wait_guard = WaitOnDrop(pair);
let (runnable, task) = unsafe {
async_task::Builder::new()
.metadata(RunnableMeta {
location,
app: None,
})
.spawn_unchecked(
move |_| async {
let _notify_guard = NotifyOnDrop(pair);
future.await
},
move |runnable| {
dispatcher.dispatch(
RunnableVariant::Meta(runnable),
None,
Priority::default(),
)
},
)
};
runnable.schedule();
task.await
}
#[track_caller]
pub fn spawn_labeled<R>(
&self,
label: TaskLabel,
future: impl Future<Output = R> + Send + 'static,
) -> Task<R>
where
R: Send + 'static,
{
self.spawn_internal::<R>(Box::pin(future), Some(label), Priority::default())
}
#[track_caller]
fn spawn_internal<R: Send + 'static>(
&self,
future: AnyFuture<R>,
label: Option<TaskLabel>,
priority: Priority,
) -> Task<R> {
let dispatcher = self.dispatcher.clone();
let (runnable, task) = if let Priority::Realtime(realtime) = priority {
let location = core::panic::Location::caller();
let (mut tx, rx) = flume::bounded::<Runnable<RunnableMeta>>(1);
dispatcher.spawn_realtime(
realtime,
Box::new(move || {
while let Ok(runnable) = rx.recv() {
let start = Instant::now();
let location = runnable.metadata().location;
let mut timing = TaskTiming {
location,
start,
end: None,
};
profiler::add_task_timing(timing);
runnable.run();
let end = Instant::now();
timing.end = Some(end);
profiler::add_task_timing(timing);
}
}),
);
async_task::Builder::new()
.metadata(RunnableMeta {
location,
app: None,
})
.spawn(
move |_| future,
move |runnable| {
let _ = tx.send(runnable);
},
)
} else {
let location = core::panic::Location::caller();
async_task::Builder::new()
.metadata(RunnableMeta {
location,
app: None,
})
.spawn(
move |_| future,
move |runnable| {
dispatcher.dispatch(RunnableVariant::Meta(runnable), label, priority)
},
)
};
runnable.schedule();
Task(TaskState::Spawned(task))
}
#[cfg(any(test, feature = "test-support"))]
#[track_caller]
pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
if let Ok(value) = self.block_internal(false, future, None) {
value
} else {
unreachable!()
}
}
pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
if let Ok(value) = self.block_internal(true, future, None) {
value
} else {
unreachable!()
}
}
#[cfg(not(any(test, feature = "test-support")))]
pub(crate) fn block_internal<Fut: Future>(
&self,
_background_only: bool,
future: Fut,
timeout: Option<Duration>,
) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
use std::time::Instant;
let mut future = Box::pin(future);
if timeout == Some(Duration::ZERO) {
return Err(future);
}
let deadline = timeout.map(|timeout| Instant::now() + timeout);
let parker = parking::Parker::new();
let unparker = parker.unparker();
let waker = waker_fn(move || {
unparker.unpark();
});
let mut cx = std::task::Context::from_waker(&waker);
loop {
match future.as_mut().poll(&mut cx) {
Poll::Ready(result) => return Ok(result),
Poll::Pending => {
let timeout =
deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
if let Some(timeout) = timeout {
if !parker.park_timeout(timeout)
&& deadline.is_some_and(|deadline| deadline < Instant::now())
{
return Err(future);
}
} else {
parker.park();
}
}
}
}
}
#[cfg(any(test, feature = "test-support"))]
#[track_caller]
pub(crate) fn block_internal<Fut: Future>(
&self,
background_only: bool,
future: Fut,
timeout: Option<Duration>,
) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
use std::sync::atomic::AtomicBool;
use std::time::Instant;
use parking::Parker;
let mut future = Box::pin(future);
if timeout == Some(Duration::ZERO) {
return Err(future);
}
let Some(dispatcher) = self.dispatcher.as_test() else {
let deadline = timeout.map(|timeout| Instant::now() + timeout);
let parker = Parker::new();
let unparker = parker.unparker();
let waker = waker_fn(move || {
unparker.unpark();
});
let mut cx = std::task::Context::from_waker(&waker);
loop {
match future.as_mut().poll(&mut cx) {
Poll::Ready(result) => return Ok(result),
Poll::Pending => {
let timeout = deadline
.map(|deadline| deadline.saturating_duration_since(Instant::now()));
if let Some(timeout) = timeout {
if !parker.park_timeout(timeout)
&& deadline.is_some_and(|deadline| deadline < Instant::now())
{
return Err(future);
}
} else {
parker.park();
}
}
}
}
};
let mut max_ticks = if timeout.is_some() {
dispatcher.gen_block_on_ticks()
} else {
usize::MAX
};
let parker = Parker::new();
let unparker = parker.unparker();
let awoken = Arc::new(AtomicBool::new(false));
let waker = waker_fn({
let awoken = awoken.clone();
let unparker = unparker.clone();
move || {
awoken.store(true, Ordering::SeqCst);
unparker.unpark();
}
});
let mut cx = std::task::Context::from_waker(&waker);
let duration = Duration::from_secs(
option_env!("GPUI_TEST_TIMEOUT")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(180),
);
let mut test_should_end_by = Instant::now() + duration;
loop {
match future.as_mut().poll(&mut cx) {
Poll::Ready(result) => return Ok(result),
Poll::Pending => {
if max_ticks == 0 {
return Err(future);
}
max_ticks -= 1;
if !dispatcher.tick(background_only) {
if awoken.swap(false, Ordering::SeqCst) {
continue;
}
if !dispatcher.parking_allowed() {
if dispatcher.advance_clock_to_next_delayed() {
continue;
}
let mut backtrace_message = String::new();
let mut waiting_message = String::new();
if let Some(backtrace) = dispatcher.waiting_backtrace() {
backtrace_message =
format!("\nbacktrace of waiting future:\n{:?}", backtrace);
}
if let Some(waiting_hint) = dispatcher.waiting_hint() {
waiting_message = format!("\n waiting on: {}\n", waiting_hint);
}
panic!(
"parked with nothing left to run{waiting_message}{backtrace_message}",
)
}
dispatcher.push_unparker(unparker.clone());
parker.park_timeout(Duration::from_millis(1));
if Instant::now() > test_should_end_by {
panic!("test timed out after {duration:?} with allow_parking")
}
}
}
}
}
}
pub fn block_with_timeout<Fut: Future>(
&self,
duration: Duration,
future: Fut,
) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
self.block_internal(true, future, Some(duration))
}
pub async fn scoped<'scope, F>(&self, scheduler: F)
where
F: FnOnce(&mut Scope<'scope>),
{
let mut scope = Scope::new(self.clone(), Priority::default());
(scheduler)(&mut scope);
let spawned = mem::take(&mut scope.futures)
.into_iter()
.map(|f| self.spawn_with_priority(scope.priority, f))
.collect::<Vec<_>>();
for task in spawned {
task.await;
}
}
pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
where
F: FnOnce(&mut Scope<'scope>),
{
let mut scope = Scope::new(self.clone(), priority);
(scheduler)(&mut scope);
let spawned = mem::take(&mut scope.futures)
.into_iter()
.map(|f| self.spawn_with_priority(scope.priority, f))
.collect::<Vec<_>>();
for task in spawned {
task.await;
}
}
pub fn now(&self) -> Instant {
self.dispatcher.now()
}
pub fn timer(&self, duration: Duration) -> Task<()> {
if duration.is_zero() {
return Task::ready(());
}
let location = core::panic::Location::caller();
let (runnable, task) = async_task::Builder::new()
.metadata(RunnableMeta {
location,
app: None,
})
.spawn(move |_| async move {}, {
let dispatcher = self.dispatcher.clone();
move |runnable| dispatcher.dispatch_after(duration, RunnableVariant::Meta(runnable))
});
runnable.schedule();
Task(TaskState::Spawned(task))
}
#[cfg(any(test, feature = "test-support"))]
pub fn start_waiting(&self) {
self.dispatcher.as_test().unwrap().start_waiting();
}
#[cfg(any(test, feature = "test-support"))]
pub fn finish_waiting(&self) {
self.dispatcher.as_test().unwrap().finish_waiting();
}
#[cfg(any(test, feature = "test-support"))]
pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
self.dispatcher.as_test().unwrap().simulate_random_delay()
}
#[cfg(any(test, feature = "test-support"))]
pub fn deprioritize(&self, task_label: TaskLabel) {
self.dispatcher.as_test().unwrap().deprioritize(task_label)
}
#[cfg(any(test, feature = "test-support"))]
pub fn advance_clock(&self, duration: Duration) {
self.dispatcher.as_test().unwrap().advance_clock(duration)
}
#[cfg(any(test, feature = "test-support"))]
pub fn tick(&self) -> bool {
self.dispatcher.as_test().unwrap().tick(false)
}
#[cfg(any(test, feature = "test-support"))]
pub fn run_until_parked(&self) {
self.dispatcher.as_test().unwrap().run_until_parked()
}
#[cfg(any(test, feature = "test-support"))]
pub fn allow_parking(&self) {
self.dispatcher.as_test().unwrap().allow_parking();
}
#[cfg(any(test, feature = "test-support"))]
pub fn forbid_parking(&self) {
self.dispatcher.as_test().unwrap().forbid_parking();
}
#[cfg(any(test, feature = "test-support"))]
pub fn set_waiting_hint(&self, msg: Option<String>) {
self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
}
#[cfg(any(test, feature = "test-support"))]
pub fn rng(&self) -> StdRng {
self.dispatcher.as_test().unwrap().rng()
}
pub fn num_cpus(&self) -> usize {
#[cfg(any(test, feature = "test-support"))]
return 4;
#[cfg(not(any(test, feature = "test-support")))]
return num_cpus::get();
}
pub fn is_main_thread(&self) -> bool {
self.dispatcher.is_main_thread()
}
#[cfg(any(test, feature = "test-support"))]
pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
}
}
impl ForegroundExecutor {
pub fn new(dispatcher: Arc<dyn PlatformDispatcher>, liveness: std::sync::Weak<()>) -> Self {
Self {
dispatcher,
liveness,
not_send: PhantomData,
}
}
#[track_caller]
pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
where
R: 'static,
{
self.inner_spawn(self.liveness.clone(), Priority::default(), future)
}
#[track_caller]
pub fn spawn_with_priority<R>(
&self,
priority: Priority,
future: impl Future<Output = R> + 'static,
) -> Task<R>
where
R: 'static,
{
self.inner_spawn(self.liveness.clone(), priority, future)
}
#[track_caller]
pub(crate) fn inner_spawn<R>(
&self,
app: std::sync::Weak<()>,
priority: Priority,
future: impl Future<Output = R> + 'static,
) -> Task<R>
where
R: 'static,
{
let dispatcher = self.dispatcher.clone();
let location = core::panic::Location::caller();
#[track_caller]
fn inner<R: 'static>(
dispatcher: Arc<dyn PlatformDispatcher>,
future: AnyLocalFuture<R>,
location: &'static core::panic::Location<'static>,
app: std::sync::Weak<()>,
priority: Priority,
) -> Task<R> {
let (runnable, task) = spawn_local_with_source_location(
future,
move |runnable| {
dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable), priority)
},
RunnableMeta {
location,
app: Some(app),
},
);
runnable.schedule();
Task(TaskState::Spawned(task))
}
inner::<R>(dispatcher, Box::pin(future), location, app, priority)
}
}
#[track_caller]
fn spawn_local_with_source_location<Fut, S, M>(
future: Fut,
schedule: S,
metadata: M,
) -> (Runnable<M>, async_task::Task<Fut::Output, M>)
where
Fut: Future + 'static,
Fut::Output: 'static,
S: async_task::Schedule<M> + Send + Sync + 'static,
M: 'static,
{
#[inline]
fn thread_id() -> ThreadId {
std::thread_local! {
static ID: ThreadId = thread::current().id();
}
ID.try_with(|id| *id)
.unwrap_or_else(|_| thread::current().id())
}
struct Checked<F> {
id: ThreadId,
inner: ManuallyDrop<F>,
location: &'static Location<'static>,
}
impl<F> Drop for Checked<F> {
fn drop(&mut self) {
assert!(
self.id == thread_id(),
"local task dropped by a thread that didn't spawn it. Task spawned at {}",
self.location
);
unsafe { ManuallyDrop::drop(&mut self.inner) };
}
}
impl<F: Future> Future for Checked<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(
self.id == thread_id(),
"local task polled by a thread that didn't spawn it. Task spawned at {}",
self.location
);
unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
}
}
let future = Checked {
id: thread_id(),
inner: ManuallyDrop::new(future),
location: Location::caller(),
};
unsafe {
async_task::Builder::new()
.metadata(metadata)
.spawn_unchecked(move |_| future, schedule)
}
}
pub struct Scope<'a> {
executor: BackgroundExecutor,
priority: Priority,
futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
tx: Option<mpsc::Sender<()>>,
rx: mpsc::Receiver<()>,
lifetime: PhantomData<&'a ()>,
}
impl<'a> Scope<'a> {
fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
let (tx, rx) = mpsc::channel(1);
Self {
executor,
priority,
tx: Some(tx),
rx,
futures: Default::default(),
lifetime: PhantomData,
}
}
pub fn num_cpus(&self) -> usize {
self.executor.num_cpus()
}
#[track_caller]
pub fn spawn<F>(&mut self, f: F)
where
F: Future<Output = ()> + Send + 'a,
{
let tx = self.tx.clone().unwrap();
let f = unsafe {
mem::transmute::<
Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
>(Box::pin(async move {
f.await;
drop(tx);
}))
};
self.futures.push(f);
}
}
impl Drop for Scope<'_> {
fn drop(&mut self) {
self.tx.take().unwrap();
self.executor.block(self.rx.next());
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{App, TestDispatcher, TestPlatform};
use rand::SeedableRng;
use std::cell::RefCell;
fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
let arc_dispatcher = Arc::new(dispatcher.clone());
let liveness = std::sync::Arc::new(());
let liveness_weak = std::sync::Arc::downgrade(&liveness);
let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
let foreground_executor = ForegroundExecutor::new(arc_dispatcher, liveness_weak);
let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
let asset_source = Arc::new(());
let http_client = http_client::FakeHttpClient::with_404_response();
let app = App::new_app(platform, liveness, asset_source, http_client);
(dispatcher, background_executor, app)
}
#[test]
fn sanity_test_tasks_run() {
let (dispatcher, _background_executor, app) = create_test_app();
let foreground_executor = app.borrow().foreground_executor.clone();
let task_ran = Rc::new(RefCell::new(false));
foreground_executor
.spawn({
let task_ran = Rc::clone(&task_ran);
async move {
*task_ran.borrow_mut() = true;
}
})
.detach();
dispatcher.run_until_parked();
assert!(
*task_ran.borrow(),
"Task should run normally when app is alive"
);
}
#[test]
fn test_task_cancelled_when_app_dropped() {
let (dispatcher, _background_executor, app) = create_test_app();
let foreground_executor = app.borrow().foreground_executor.clone();
let app_weak = Rc::downgrade(&app);
let task_ran = Rc::new(RefCell::new(false));
let task_ran_clone = Rc::clone(&task_ran);
foreground_executor
.spawn(async move {
*task_ran_clone.borrow_mut() = true;
})
.detach();
drop(app);
assert!(app_weak.upgrade().is_none(), "App should have been dropped");
dispatcher.run_until_parked();
assert!(
!*task_ran.borrow(),
"Task should have been cancelled when app was dropped, but it ran!"
);
}
#[test]
fn test_nested_tasks_both_cancel() {
let (dispatcher, _background_executor, app) = create_test_app();
let foreground_executor = app.borrow().foreground_executor.clone();
let app_weak = Rc::downgrade(&app);
let outer_completed = Rc::new(RefCell::new(false));
let inner_completed = Rc::new(RefCell::new(false));
let reached_await = Rc::new(RefCell::new(false));
let outer_flag = Rc::clone(&outer_completed);
let inner_flag = Rc::clone(&inner_completed);
let await_flag = Rc::clone(&reached_await);
let (tx, rx) = futures::channel::oneshot::channel::<()>();
let inner_executor = foreground_executor.clone();
foreground_executor
.spawn(async move {
let inner_task = inner_executor.spawn({
let inner_flag = Rc::clone(&inner_flag);
async move {
rx.await.ok();
*inner_flag.borrow_mut() = true;
}
});
*await_flag.borrow_mut() = true;
inner_task.await;
*outer_flag.borrow_mut() = true;
})
.detach();
dispatcher.run_until_parked();
assert!(
*reached_await.borrow(),
"Outer task should have reached the await point"
);
assert!(
!*outer_completed.borrow(),
"Outer task should not have completed yet"
);
assert!(
!*inner_completed.borrow(),
"Inner task should not have completed yet"
);
drop(tx);
drop(app);
assert!(app_weak.upgrade().is_none(), "App should have been dropped");
dispatcher.run_until_parked();
assert!(
!*outer_completed.borrow(),
"Outer task should have been cancelled, not completed"
);
assert!(
!*inner_completed.borrow(),
"Inner task should have been cancelled, not completed"
);
}
#[test]
#[should_panic]
fn test_polling_cancelled_task_panics() {
let (dispatcher, background_executor, app) = create_test_app();
let foreground_executor = app.borrow().foreground_executor.clone();
let app_weak = Rc::downgrade(&app);
let task = foreground_executor.spawn(async move { 42 });
drop(app);
assert!(app_weak.upgrade().is_none(), "App should have been dropped");
dispatcher.run_until_parked();
background_executor.block(task);
}
#[test]
fn test_polling_cancelled_task_returns_none_with_fallible() {
let (dispatcher, background_executor, app) = create_test_app();
let foreground_executor = app.borrow().foreground_executor.clone();
let app_weak = Rc::downgrade(&app);
let task = foreground_executor.spawn(async move { 42 }).fallible();
drop(app);
assert!(app_weak.upgrade().is_none(), "App should have been dropped");
dispatcher.run_until_parked();
let result = background_executor.block(task);
assert_eq!(result, None, "Cancelled task should return None");
}
}