1use std::any::{Any, TypeId};
6use std::cell::{Cell, RefCell};
7use std::collections::{HashMap, VecDeque};
8use std::fmt;
9use std::future::Future;
10use std::rc::{Rc, Weak};
11
12use auralis_signal::{Memo, Signal};
13
14use crate::executor;
15use crate::Priority;
16
17type ScopeId = u64;
18type TaskId = u64;
19
20thread_local! {
25 static NEXT_SCOPE_ID: Cell<ScopeId> = const { Cell::new(1) };
26}
27
28fn alloc_scope_id() -> ScopeId {
29 NEXT_SCOPE_ID.with(|c| {
30 let id = c.get();
31 c.set(id + 1);
32 id
33 })
34}
35
36pub struct CallbackHandle {
48 cleanup: Option<Box<dyn FnOnce() + 'static>>,
49}
50
51impl CallbackHandle {
52 pub fn new(cleanup: impl FnOnce() + 'static) -> Self {
54 Self {
55 cleanup: Some(Box::new(cleanup)),
56 }
57 }
58
59 #[must_use]
64 pub fn noop() -> Self {
65 Self { cleanup: None }
66 }
67}
68
69impl Drop for CallbackHandle {
70 fn drop(&mut self) {
71 if let Some(f) = self.cleanup.take() {
72 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
73 }
74 }
75}
76
77type ScopeRegistryEntry = (Weak<RefCell<TaskScopeInner>>, Weak<Cell<bool>>);
98
99thread_local! {
100 static SCOPE_REGISTRY: RefCell<HashMap<ScopeId, ScopeRegistryEntry>> =
101 RefCell::new(HashMap::new());
102}
103
104fn register_scope(id: ScopeId, inner: &Rc<RefCell<TaskScopeInner>>, suspended: &Rc<Cell<bool>>) {
107 let _ = SCOPE_REGISTRY.try_with(|reg| {
108 if let Ok(mut r) = reg.try_borrow_mut() {
109 r.insert(id, (Rc::downgrade(inner), Rc::downgrade(suspended)));
110 }
111 });
112}
113
114fn unregister_scope(id: ScopeId) {
115 let _ = SCOPE_REGISTRY.try_with(|reg| {
116 if let Ok(mut r) = reg.try_borrow_mut() {
117 r.remove(&id);
118 }
119 });
120}
121
122#[must_use]
126pub fn find_scope(scope_id: ScopeId) -> Option<TaskScope> {
127 SCOPE_REGISTRY
128 .try_with(|reg| {
129 if let Ok(r) = reg.try_borrow() {
130 r.get(&scope_id).and_then(|(inner_weak, suspended_weak)| {
131 let inner = inner_weak.upgrade()?;
132 let suspended = suspended_weak.upgrade()?;
133 let cancelled = inner.borrow().cancelled.clone();
134 Some(TaskScope {
135 inner,
136 cancelled,
137 suspended,
138 })
139 })
140 } else {
141 None
142 }
143 })
144 .ok()
145 .flatten()
146}
147
148#[cfg(feature = "debug")]
152#[doc(hidden)]
153#[must_use]
154pub fn scope_debug_label(scope_id: ScopeId) -> Option<String> {
155 find_scope(scope_id).and_then(|s| s.inner.borrow().debug_label.clone())
156}
157
158#[doc(hidden)]
160pub fn clear_scope_registry() {
161 let _ = SCOPE_REGISTRY.try_with(|reg| {
162 if let Ok(mut r) = reg.try_borrow_mut() {
163 r.clear();
164 }
165 });
166}
167
168type ScopeSetFn = fn(Option<TaskScope>);
178type ScopeGetFn = fn() -> Option<TaskScope>;
179
180pub struct ScopeStore {
187 pub set_fn: ScopeSetFn,
189 pub get_fn: ScopeGetFn,
191}
192
193use std::sync::OnceLock;
194static SCOPE_STORE: OnceLock<ScopeStore> = OnceLock::new();
195
196fn ensure_default_store() -> &'static ScopeStore {
197 SCOPE_STORE.get_or_init(|| ScopeStore {
198 set_fn: thread_local_set,
199 get_fn: thread_local_get,
200 })
201}
202
203pub fn set_scope_store(store: ScopeStore) {
219 let _ = SCOPE_STORE.set(store);
220}
221
222thread_local! {
249 static CURRENT_SCOPE: RefCell<Option<TaskScope>> = const { RefCell::new(None) };
250}
251
252fn thread_local_set(scope: Option<TaskScope>) {
253 CURRENT_SCOPE.with(|cell| {
254 cell.replace(scope);
255 });
256}
257
258fn thread_local_get() -> Option<TaskScope> {
259 CURRENT_SCOPE.with(|cell| cell.borrow().clone())
260}
261
262pub(crate) fn set_scope_direct(scope: Option<TaskScope>) {
267 let store = ensure_default_store();
268 (store.set_fn)(scope);
269}
270
271pub(crate) fn get_scope_direct() -> Option<TaskScope> {
273 let store = ensure_default_store();
274 (store.get_fn)()
275}
276
277#[cfg(feature = "ssr-tokio")]
293pub fn init_scope_store_tokio() {
294 tokio::task_local! {
295 static TK_SCOPE: std::cell::RefCell<Option<TaskScope>>;
296 }
297
298 let _ = TK_SCOPE.try_with(|cell| {
300 cell.replace(None);
301 });
302
303 set_scope_store(ScopeStore {
304 set_fn: |s| {
305 let _ = TK_SCOPE.try_with(|cell| {
306 cell.replace(s);
307 });
308 },
309 get_fn: || {
310 TK_SCOPE
311 .try_with(|cell| cell.borrow().clone())
312 .ok()
313 .flatten()
314 },
315 });
316}
317
318pub fn with_current_scope<R>(scope: &TaskScope, f: impl FnOnce() -> R) -> R {
328 let store = ensure_default_store();
329 let prev = (store.get_fn)();
330 (store.set_fn)(Some(scope.clone_inner()));
331 let result = f();
332 (store.set_fn)(prev);
333 result
334}
335
336#[must_use]
338pub fn current_scope() -> Option<TaskScope> {
339 let store = ensure_default_store();
340 (store.get_fn)()
341}
342
343struct TaskScopeInner {
348 id: ScopeId,
349 task_ids: Vec<TaskId>,
350 children: Vec<TaskScope>,
351 parent: Option<Weak<RefCell<TaskScopeInner>>>,
353 context: RefCell<HashMap<TypeId, Rc<dyn Any>>>,
355 callbacks: RefCell<Vec<CallbackHandle>>,
357 cancelled: Rc<Cell<bool>>,
362 #[cfg(feature = "debug")]
364 debug_label: Option<String>,
365 executor: executor::ExecutorRef,
370}
371
372pub struct JoinHandle {
382 task_id: Option<TaskId>,
383 executor: executor::ExecutorRef,
384}
385
386impl JoinHandle {
387 pub fn cancel(&self) {
392 if let Some(tid) = self.task_id {
393 executor::cancel_task(&self.executor, tid);
394 }
395 }
396
397 pub fn is_finished(&self) -> bool {
402 match self.task_id {
403 Some(tid) => executor::is_task_finished(&self.executor, tid),
404 None => true,
405 }
406 }
407
408 pub fn task_id(&self) -> Option<TaskId> {
411 self.task_id
412 }
413}
414
415impl fmt::Debug for JoinHandle {
416 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
417 f.debug_struct("JoinHandle")
418 .field("task_id", &self.task_id)
419 .finish_non_exhaustive()
420 }
421}
422
423#[must_use]
449pub struct TaskScope {
450 inner: Rc<RefCell<TaskScopeInner>>,
451 cancelled: Rc<Cell<bool>>,
457 suspended: Rc<Cell<bool>>,
460}
461
462impl TaskScope {
463 pub fn new() -> Self {
467 Self::with_executor(&executor::current_executor_instance())
468 }
469
470 pub fn with_executor(ex: &executor::ExecutorRef) -> Self {
476 let cancelled = Rc::new(Cell::new(false));
477 let inner = Rc::new(RefCell::new(TaskScopeInner {
478 id: alloc_scope_id(),
479 task_ids: Vec::new(),
480 children: Vec::new(),
481 parent: None,
482 context: RefCell::new(HashMap::new()),
483 callbacks: RefCell::new(Vec::new()),
484 cancelled: Rc::clone(&cancelled),
485 #[cfg(feature = "debug")]
486 debug_label: None,
487 executor: Rc::clone(ex),
488 }));
489 let id = inner.borrow().id;
490 let suspended = Rc::new(Cell::new(false));
491 register_scope(id, &inner, &suspended);
492 Self {
493 inner,
494 cancelled,
495 suspended,
496 }
497 }
498
499 pub fn new_child(parent: &Self) -> Self {
501 let ex = parent.inner.borrow().executor.clone();
502 let cancelled = Rc::new(Cell::new(false));
503 let inner = Rc::new(RefCell::new(TaskScopeInner {
504 id: alloc_scope_id(),
505 task_ids: Vec::new(),
506 children: Vec::new(),
507 parent: Some(Rc::downgrade(&parent.inner)),
508 context: RefCell::new(HashMap::new()),
509 callbacks: RefCell::new(Vec::new()),
510 cancelled: Rc::clone(&cancelled),
511 #[cfg(feature = "debug")]
512 debug_label: None,
513 executor: ex,
514 }));
515 let id = inner.borrow().id;
516 let suspended = Rc::new(Cell::new(false));
517 register_scope(id, &inner, &suspended);
518 let child = Self {
519 inner,
520 cancelled,
521 suspended,
522 };
523 parent.inner.borrow_mut().children.push(child.clone_inner());
524 child
525 }
526
527 pub fn spawn(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle {
533 self.spawn_with_priority(Priority::Low, future)
534 }
535
536 pub fn spawn_with_priority(
544 &self,
545 priority: Priority,
546 future: impl Future<Output = ()> + 'static,
547 ) -> JoinHandle {
548 let (cancelled, ex, scope_id) = {
554 let inner = self.inner.borrow();
555 (inner.cancelled.get(), Rc::clone(&inner.executor), inner.id)
556 };
557 if cancelled {
558 return JoinHandle {
559 task_id: None,
560 executor: ex,
561 };
562 }
563 let task_id = executor::with_executor(&ex, || {
564 with_current_scope(self, || {
565 executor::spawn_scoped_on(&ex, priority, scope_id, future)
566 })
567 });
568 self.inner.borrow_mut().task_ids.push(task_id);
569 JoinHandle {
570 task_id: Some(task_id),
571 executor: ex,
572 }
573 }
574
575 pub fn watch<T: Clone + 'static>(
588 &self,
589 sig: &Signal<T>,
590 f: impl FnMut(&T) + 'static,
591 ) -> JoinHandle {
592 let s = sig.clone();
593 let mut f = f;
594 self.spawn(async move {
595 loop {
596 s.changed().await;
597 f(&s.read());
598 }
599 })
600 }
601
602 pub fn watch_effect(&self, effect: impl Fn() + 'static) -> JoinHandle {
611 let memo = Memo::new(effect);
612 self.spawn(async move {
613 loop {
614 memo.changed().await;
615 #[allow(clippy::let_unit_value, clippy::ignored_unit_patterns)]
616 let _ = memo.read();
617 }
618 })
619 }
620
621 pub fn register_callback_handle(&self, handle: CallbackHandle) {
629 let inner = self.inner.borrow();
630 if inner.cancelled.get() {
631 return;
632 }
633 inner.callbacks.borrow_mut().push(handle);
634 }
635
636 pub fn on_cleanup(&self, f: impl FnOnce() + 'static) {
645 self.register_callback_handle(CallbackHandle::new(f));
646 }
647
648 pub fn provide<T: 'static>(&self, value: T) {
656 self.inner
657 .borrow()
658 .context
659 .borrow_mut()
660 .insert(TypeId::of::<T>(), Rc::new(value));
661 }
662
663 #[must_use]
668 pub fn consume<T: 'static>(&self) -> Option<Rc<T>> {
669 let mut current = Some(Rc::clone(&self.inner));
670
671 while let Some(inner) = current {
672 {
674 let inner_ref = inner.borrow();
675 let ctx = inner_ref.context.borrow();
676 if let Some(val) = ctx.get(&TypeId::of::<T>()) {
677 if let Ok(downcast) = val.clone().downcast::<T>() {
678 return Some(downcast);
679 }
680 }
681 }
682
683 let parent = {
685 let inner_ref = inner.borrow();
686 inner_ref.parent.as_ref().and_then(Weak::upgrade)
687 };
688 current = parent;
689 }
690
691 None
692 }
693
694 #[must_use]
701 #[track_caller]
702 pub fn expect_context<T: 'static>(&self) -> Rc<T> {
703 self.consume::<T>()
704 .unwrap_or_else(|| panic!("context not found: {}", std::any::type_name::<T>()))
705 }
706
707 #[must_use]
711 pub fn is_cancelled(&self) -> bool {
712 self.cancelled.get()
713 }
714
715 #[cfg(feature = "debug")]
721 pub fn set_debug_label(&self, label: impl Into<String>) {
722 self.inner.borrow_mut().debug_label = Some(label.into());
723 }
724
725 #[cfg(test)]
729 #[must_use]
730 pub fn task_count(&self) -> usize {
731 self.inner.borrow().task_ids.len()
732 }
733
734 #[cfg(test)]
736 #[must_use]
737 pub fn child_count(&self) -> usize {
738 self.inner.borrow().children.len()
739 }
740
741 fn clone_inner(&self) -> Self {
744 Self {
745 inner: Rc::clone(&self.inner),
746 cancelled: Rc::clone(&self.cancelled),
747 suspended: Rc::clone(&self.suspended),
748 }
749 }
750
751 pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
756 with_current_scope(self, f)
757 }
758
759 pub fn suspend(&self) {
769 if self.suspended.get() {
770 return;
771 }
772 self.suspended.set(true);
773 let children: Vec<TaskScope> = {
775 self.inner
776 .borrow()
777 .children
778 .iter()
779 .map(TaskScope::clone_inner)
780 .collect()
781 };
782 for child in &children {
783 child.suspend();
784 }
785 }
786
787 pub fn resume(&self) {
792 if !self.suspended.get() {
793 return;
794 }
795 self.suspended.set(false);
796
797 let (task_ids, children) = {
798 let inner = self.inner.borrow();
799 let tids = inner.task_ids.clone();
800 let children: Vec<TaskScope> =
801 inner.children.iter().map(TaskScope::clone_inner).collect();
802 (tids, children)
803 };
804
805 let ex = Rc::clone(&self.inner.borrow().executor);
807 executor::enqueue_scope_tasks_on(&ex, &task_ids);
808
809 for child in &children {
811 child.resume();
812 }
813 }
814
815 #[must_use]
817 pub fn is_suspended(&self) -> bool {
818 self.suspended.get()
819 }
820}
821
822impl Default for TaskScope {
823 fn default() -> Self {
824 Self::new()
825 }
826}
827
828impl Clone for TaskScope {
829 fn clone(&self) -> Self {
830 self.clone_inner()
831 }
832}
833
834impl Drop for TaskScope {
841 fn drop(&mut self) {
842 if Rc::strong_count(&self.inner) > 1 {
847 return;
848 }
849
850 self.cancelled.set(true);
854
855 let Ok(mut inner) = self.inner.try_borrow_mut() else {
856 eprintln!(
861 "[auralis-task] WARNING: TaskScope::drop cannot borrow inner \
862 (already borrowed). Tasks and callbacks in this scope will \
863 be cleaned up on the next executor flush. Avoid dropping \
864 the last TaskScope clone inside a callback."
865 );
866 return;
867 };
868
869 inner.callbacks.borrow_mut().clear();
871
872 let mut descendants: Vec<Rc<RefCell<TaskScopeInner>>> = Vec::new();
874 {
875 let mut queue: VecDeque<Rc<RefCell<TaskScopeInner>>> = VecDeque::new();
876 for child in &inner.children {
877 queue.push_back(Rc::clone(&child.inner));
878 }
879
880 while let Some(scope_rc) = queue.pop_front() {
881 let scope = scope_rc.borrow();
882 for child in &scope.children {
883 queue.push_back(Rc::clone(&child.inner));
884 }
885 descendants.push(Rc::clone(&scope_rc));
886 }
887 }
888
889 for scope_rc in descendants.iter().rev() {
891 let mut scope = scope_rc.borrow_mut();
892 if scope.cancelled.get() {
893 continue;
894 }
895 scope.cancelled.set(true);
896
897 scope.callbacks.borrow_mut().clear();
899
900 if !scope.task_ids.is_empty() {
901 let ex = Rc::clone(&scope.executor);
902 let task_ids = std::mem::take(&mut scope.task_ids);
903 let dropped_futures = executor::cancel_scope_tasks_on(&ex, &task_ids);
904 drop(dropped_futures);
905 }
906 scope.context.borrow_mut().clear();
907 unregister_scope(scope.id);
908 }
909
910 if !inner.task_ids.is_empty() {
912 let ex = Rc::clone(&inner.executor);
913 let task_ids = std::mem::take(&mut inner.task_ids);
914 let dropped_futures = executor::cancel_scope_tasks_on(&ex, &task_ids);
915 drop(dropped_futures);
916 }
917
918 inner.context.borrow_mut().clear();
919 inner.children.clear();
920
921 unregister_scope(inner.id);
923 }
924}
925
926#[macro_export]
936macro_rules! provide_context {
937 ($scope:expr, $value:expr) => {
938 $scope.provide($value)
939 };
940}
941
942#[macro_export]
948macro_rules! consume_context {
949 ($scope:expr, $ty:ty) => {
950 $scope.consume::<$ty>()
951 };
952}
953
954#[cfg(test)]
959#[allow(clippy::items_after_statements)]
960mod tests {
961 use super::*;
962 use crate::executor::{self, init_flush_scheduler, reset_executor_for_test, TestScheduleFlush};
963 use crate::{init_time_source, ScheduleFlush, TestTimeSource, TimeSource};
964 use auralis_signal::Signal;
965 use std::cell::{Cell, RefCell};
966 use std::rc::Rc;
967 use std::time::Duration;
968
969 fn init() {
970 reset_executor_for_test();
971 init_flush_scheduler(Rc::new(TestScheduleFlush));
972 }
973
974 #[test]
977 fn new_scope_has_zero_tasks() {
978 let scope = TaskScope::new();
979 assert_eq!(scope.task_count(), 0);
980 assert_eq!(scope.child_count(), 0);
981 }
982
983 #[test]
984 fn new_child_attaches_to_parent() {
985 let parent = TaskScope::new();
986 let _child = TaskScope::new_child(&parent);
987 assert_eq!(parent.child_count(), 1);
988 }
989
990 #[test]
991 fn spawn_adds_task() {
992 init();
993 let scope = TaskScope::new();
994 scope.spawn(async {});
995 assert_eq!(scope.task_count(), 1);
996 }
997
998 #[test]
999 fn spawn_and_complete() {
1000 init();
1001 let done = Rc::new(Cell::new(false));
1002 let done2 = Rc::clone(&done);
1003 spawn_global(async move {
1004 done2.set(true);
1005 });
1006 assert!(done.get());
1007 }
1008
1009 #[test]
1010 fn scope_spawn_and_cancel() {
1011 init();
1012 let dropped = Rc::new(Cell::new(false));
1013 {
1014 let scope = TaskScope::new();
1015 let d = Rc::clone(&dropped);
1016 struct DropCheck(Rc<Cell<bool>>);
1017 impl Drop for DropCheck {
1018 fn drop(&mut self) {
1019 self.0.set(true);
1020 }
1021 }
1022 scope.spawn(async move {
1023 let _guard = DropCheck(d);
1024 std::future::pending::<()>().await;
1025 });
1026 assert_eq!(executor::debug_task_count(), 1);
1027 }
1028 assert!(dropped.get());
1029 assert_eq!(executor::debug_task_count(), 0);
1030 }
1031
1032 #[test]
1033 fn nested_scope_child_cancel_with_parent() {
1034 init();
1035 let dropped_child = Rc::new(Cell::new(false));
1036 {
1037 let parent = TaskScope::new();
1038 let child = TaskScope::new_child(&parent);
1039 let d = Rc::clone(&dropped_child);
1040 struct DropCheck(Rc<Cell<bool>>);
1041 impl Drop for DropCheck {
1042 fn drop(&mut self) {
1043 self.0.set(true);
1044 }
1045 }
1046 child.spawn(async move {
1047 let _guard = DropCheck(d);
1048 std::future::pending::<()>().await;
1049 });
1050 assert_eq!(executor::debug_task_count(), 1);
1051 }
1052 assert!(dropped_child.get());
1053 assert_eq!(executor::debug_task_count(), 0);
1054 }
1055
1056 #[test]
1057 fn deeply_nested_scope_drop_no_stack_overflow() {
1058 init();
1059 let root = TaskScope::new();
1060 {
1061 let mut current = TaskScope::new_child(&root);
1062 for _ in 0..199 {
1063 current = TaskScope::new_child(¤t);
1064 }
1065 }
1066 drop(root);
1067 assert_eq!(executor::debug_task_count(), 0);
1068 }
1069
1070 #[test]
1071 fn scope_child_explicit_tree() {
1072 let root = TaskScope::new();
1073 let a = TaskScope::new_child(&root);
1074 let b = TaskScope::new_child(&root);
1075 let _a1 = TaskScope::new_child(&a);
1076 let _a2 = TaskScope::new_child(&a);
1077 assert_eq!(root.child_count(), 2);
1078 assert_eq!(a.child_count(), 2);
1079 assert_eq!(b.child_count(), 0);
1080 }
1081
1082 #[test]
1085 fn callback_handle_dropped_before_tasks() {
1086 init();
1087 let dropped_order: Rc<RefCell<Vec<String>>> = Rc::new(RefCell::new(Vec::new()));
1088 {
1089 let scope = TaskScope::new();
1090 let order1 = Rc::clone(&dropped_order);
1091 scope.register_callback_handle(CallbackHandle::new(move || {
1092 order1.borrow_mut().push("callback".to_string());
1093 }));
1094 let order2 = Rc::clone(&dropped_order);
1095 struct DropCheck {
1096 order: Rc<RefCell<Vec<String>>>,
1097 label: String,
1098 }
1099 impl Drop for DropCheck {
1100 fn drop(&mut self) {
1101 self.order.borrow_mut().push(self.label.clone());
1102 }
1103 }
1104 scope.spawn(async move {
1105 let _guard = DropCheck {
1106 order: order2,
1107 label: "task".to_string(),
1108 };
1109 std::future::pending::<()>().await;
1110 });
1111 }
1112 let order = dropped_order.borrow().clone();
1113 assert_eq!(order, vec!["callback", "task"]);
1114 }
1115
1116 #[test]
1117 fn callback_handle_cleaned_up_on_child_scope_drop() {
1118 init();
1119 let called = Rc::new(Cell::new(false));
1120 {
1121 let parent = TaskScope::new();
1122 let child = TaskScope::new_child(&parent);
1123 let c = Rc::clone(&called);
1124 child.register_callback_handle(CallbackHandle::new(move || {
1125 c.set(true);
1126 }));
1127 }
1129 assert!(called.get());
1130 }
1131
1132 #[test]
1135 fn context_provide_and_consume_in_same_scope() {
1136 let scope = TaskScope::new();
1137 scope.provide(42i32);
1138 assert_eq!(*scope.consume::<i32>().unwrap(), 42);
1139 }
1140
1141 #[test]
1142 fn context_consume_walks_up_to_parent() {
1143 let parent = TaskScope::new();
1144 parent.provide("hello".to_string());
1145 let child = TaskScope::new_child(&parent);
1146 assert_eq!(*child.consume::<String>().unwrap(), "hello");
1147 }
1148
1149 #[test]
1150 fn context_consume_not_found() {
1151 let scope = TaskScope::new();
1152 assert!(scope.consume::<i32>().is_none());
1153 }
1154
1155 #[test]
1156 fn context_removed_on_scope_drop() {
1157 let parent = TaskScope::new();
1158 parent.provide(99u32);
1159 {
1160 let _child = TaskScope::new_child(&parent);
1161 }
1163 assert_eq!(*parent.consume::<u32>().unwrap(), 99);
1165 }
1166
1167 #[test]
1168 fn context_shadowing() {
1169 let parent = TaskScope::new();
1170 parent.provide(1i32);
1171 let child = TaskScope::new_child(&parent);
1172 child.provide(2i32);
1173 assert_eq!(*child.consume::<i32>().unwrap(), 2);
1175 assert_eq!(*parent.consume::<i32>().unwrap(), 1);
1177 }
1178
1179 #[test]
1180 #[should_panic(expected = "context not found")]
1181 fn expect_context_panics_when_missing() {
1182 let scope = TaskScope::new();
1183 let _ = scope.expect_context::<String>();
1184 }
1185
1186 #[test]
1189 fn executor_priority_ordering() {
1190 init();
1191 let order = Rc::new(RefCell::new(Vec::new()));
1192 let o1 = Rc::clone(&order);
1193 executor::spawn_no_auto_flush(Priority::Low, async move {
1194 o1.borrow_mut().push("low");
1195 });
1196 let o2 = Rc::clone(&order);
1197 executor::spawn_no_auto_flush(Priority::High, async move {
1198 o2.borrow_mut().push("high");
1199 });
1200 executor::flush_all();
1201 let result = order.borrow().clone();
1202 assert_eq!(result, vec!["high", "low"]);
1203 }
1204
1205 #[test]
1206 fn executor_batch() {
1207 init();
1208 let counter = Rc::new(Cell::new(0u32));
1209 for _ in 0..10 {
1210 let c = Rc::clone(&counter);
1211 spawn_global(async move {
1212 c.set(c.get() + 1);
1213 });
1214 }
1215 assert_eq!(counter.get(), 10);
1216 assert_eq!(executor::debug_task_count(), 0);
1217 }
1218
1219 #[test]
1220 fn no_leak_on_cancel() {
1221 init();
1222 for _ in 0..50 {
1223 let scope = TaskScope::new();
1224 for _ in 0..5 {
1225 scope.spawn(std::future::pending::<()>());
1226 }
1227 }
1228 assert_eq!(executor::debug_task_count(), 0);
1229 }
1230
1231 #[test]
1232 fn set_deferred_triggers_after_flush() {
1233 use auralis_signal::Signal;
1234 init();
1235 let sig = Signal::new(0);
1236 let observed = Rc::new(Cell::new(0));
1237 set_deferred(&sig, 42);
1238 assert_eq!(sig.read(), 42);
1239 let ob1 = Rc::clone(&observed);
1240 spawn_global(async move {
1241 ob1.set(sig.read());
1242 });
1243 assert_eq!(observed.get(), 42);
1244 }
1245
1246 #[test]
1247 fn set_deferred_in_drop_safe() {
1248 use auralis_signal::Signal;
1249 init();
1250 let sig = Signal::new(0);
1251 struct SetOnDrop {
1252 sig: Signal<i32>,
1253 val: i32,
1254 }
1255 impl Drop for SetOnDrop {
1256 fn drop(&mut self) {
1257 set_deferred(&self.sig, self.val);
1258 }
1259 }
1260 let guard = SetOnDrop {
1261 sig: sig.clone(),
1262 val: 99,
1263 };
1264 drop(guard);
1265 assert_eq!(sig.read(), 99);
1266 }
1267
1268 #[test]
1269 fn set_deferred_from_drop_guard_during_scope_cancel() {
1270 use auralis_signal::Signal;
1271 init();
1272
1273 let sig = Signal::new(0i32);
1274
1275 struct ResetOnDrop {
1279 sig: Signal<i32>,
1280 }
1281 impl Drop for ResetOnDrop {
1282 fn drop(&mut self) {
1283 set_deferred(&self.sig, 42);
1284 }
1285 }
1286
1287 {
1288 let scope = TaskScope::new();
1289 let s = sig.clone();
1290 scope.spawn(async move {
1291 let _guard = ResetOnDrop { sig: s };
1292 std::future::pending::<()>().await;
1295 });
1296 }
1298
1299 assert_eq!(
1301 sig.read(),
1302 42,
1303 "set_deferred should have fired after scope cancel"
1304 );
1305 }
1306
1307 #[test]
1308 fn yield_now_gives_other_tasks_a_turn() {
1309 init();
1310 let order = Rc::new(RefCell::new(Vec::new()));
1311 let o1 = Rc::clone(&order);
1312 executor::spawn_no_auto_flush(Priority::Low, async move {
1313 o1.borrow_mut().push("a1");
1314 executor::yield_now().await;
1315 o1.borrow_mut().push("a2");
1316 });
1317 let o2 = Rc::clone(&order);
1318 executor::spawn_no_auto_flush(Priority::Low, async move {
1319 o2.borrow_mut().push("b1");
1320 o2.borrow_mut().push("b2");
1321 });
1322 executor::flush_all();
1323 let r = order.borrow().clone();
1324 assert_eq!(&r[0..3], &["a1", "b1", "b2"][..]);
1325 assert!(r.contains(&"a2"));
1326 }
1327
1328 #[test]
1329 fn panic_in_task_is_isolated() {
1330 init();
1331 let survived = Rc::new(Cell::new(false));
1332 let s = Rc::clone(&survived);
1333 spawn_global(async move {
1334 panic!("intentional test panic");
1335 });
1336 spawn_global(async move {
1337 s.set(true);
1338 });
1339 assert!(survived.get());
1340 assert_eq!(executor::debug_task_count(), 0);
1341 }
1342
1343 #[test]
1346 fn time_budget_with_test_time_source() {
1347 init();
1348 let ts = Rc::new(TestTimeSource::new(0));
1349 init_time_source(ts.clone());
1350
1351 let polled = Rc::new(Cell::new(0u32));
1352
1353 for _ in 0..50 {
1356 let pc = Rc::clone(&polled);
1357 let ts_c = Rc::clone(&ts);
1358 executor::spawn_no_auto_flush(Priority::Low, async move {
1359 pc.set(pc.get() + 1);
1360 ts_c.advance(1);
1361 });
1362 }
1363
1364 executor::flush_all();
1368
1369 assert_eq!(polled.get(), 50);
1370 assert_eq!(executor::debug_task_count(), 0);
1371 }
1372
1373 #[test]
1374 fn time_budget_honoured_with_split() {
1375 let schedule_count = Rc::new(Cell::new(0u32));
1379 struct NoopScheduleFlush(Rc<Cell<u32>>);
1380 impl ScheduleFlush for NoopScheduleFlush {
1381 fn schedule(&self, _callback: Box<dyn FnOnce()>) {
1382 self.0.set(self.0.get() + 1);
1383 }
1386 }
1387 init_flush_scheduler(Rc::new(NoopScheduleFlush(Rc::clone(&schedule_count))));
1388
1389 let ts = Rc::new(TestTimeSource::new(0));
1390 init_time_source(ts.clone());
1391
1392 let polled = Rc::new(RefCell::new(Vec::new()));
1393
1394 for i in 0..50u32 {
1395 let pc = Rc::clone(&polled);
1396 let ts_c = Rc::clone(&ts);
1397 executor::spawn_no_auto_flush(Priority::Low, async move {
1398 pc.borrow_mut().push(i);
1399 ts_c.advance(1);
1400 });
1401 }
1402
1403 executor::flush_all();
1404
1405 let completed = polled.borrow().len();
1406 assert!(
1407 completed < 50,
1408 "budget should split before all tasks run (only {completed} of 50)"
1409 );
1410 assert!(
1411 completed >= 7,
1412 "at least 7 tasks should run before budget expires ({completed})"
1413 );
1414 assert_eq!(
1415 schedule_count.get(),
1416 1,
1417 "next flush should have been scheduled exactly once"
1418 );
1419
1420 init_flush_scheduler(Rc::new(TestScheduleFlush));
1423 executor::flush_all();
1424 assert_eq!(executor::debug_task_count(), 0);
1425 }
1426
1427 #[test]
1430 fn provide_context_macro_works() {
1431 let scope = TaskScope::new();
1432 provide_context!(scope, 42i32);
1433 assert_eq!(*scope.consume::<i32>().unwrap(), 42);
1434 }
1435
1436 #[test]
1437 fn consume_context_macro_works() {
1438 let scope = TaskScope::new();
1439 scope.provide(99u32);
1440 let val: Option<Rc<u32>> = consume_context!(scope, u32);
1441 assert_eq!(*val.unwrap(), 99);
1442 }
1443
1444 #[test]
1445 fn consume_context_macro_not_found() {
1446 let scope = TaskScope::new();
1447 let val: Option<Rc<String>> = consume_context!(scope, String);
1448 assert!(val.is_none());
1449 }
1450
1451 #[cfg(feature = "debug")]
1454 #[test]
1455 fn dump_task_tree_returns_string() {
1456 init();
1457 let scope = TaskScope::new();
1458 scope.spawn(async { std::future::pending::<()>().await });
1459
1460 let output = crate::dump_task_tree();
1461 assert!(output.contains("Auralis Task Tree"));
1462 assert!(output.contains("Total active tasks: 1"));
1463 assert!(output.contains("Scope"));
1464 }
1465
1466 #[cfg(feature = "debug")]
1467 #[test]
1468 fn dump_task_tree_empty() {
1469 init();
1470 let output = crate::dump_task_tree();
1471 assert!(output.contains("(no active tasks)"));
1472 }
1473
1474 use crate::{set_deferred, spawn_global};
1475
1476 #[test]
1479 fn suspend_prevents_task_execution() {
1480 init();
1481 let scope = TaskScope::new();
1482 let executed = Rc::new(Cell::new(false));
1483 let ex = Rc::clone(&executed);
1484 scope.spawn(async move {
1485 ex.set(true);
1486 });
1487 assert!(executed.get());
1489 executed.set(false);
1490
1491 scope.suspend();
1492 let ex2 = Rc::clone(&executed);
1493 scope.spawn(async move {
1494 ex2.set(true);
1495 });
1496 assert!(!executed.get());
1498 }
1499
1500 #[test]
1501 fn resume_allows_task_execution() {
1502 init();
1503 let scope = TaskScope::new();
1504 scope.suspend();
1505 let executed = Rc::new(Cell::new(false));
1506 let ex = Rc::clone(&executed);
1507 scope.spawn(async move {
1508 ex.set(true);
1509 });
1510 assert!(!executed.get());
1511
1512 scope.resume();
1513 assert!(executed.get());
1515 }
1516
1517 #[test]
1518 fn suspend_cascades_to_children() {
1519 init();
1520 let parent = TaskScope::new();
1521 let child = TaskScope::new_child(&parent);
1522 assert!(!child.is_suspended());
1523
1524 parent.suspend();
1525 assert!(parent.is_suspended());
1526 assert!(child.is_suspended());
1527 }
1528
1529 #[test]
1530 fn resume_cascades_to_children() {
1531 init();
1532 let parent = TaskScope::new();
1533 let child = TaskScope::new_child(&parent);
1534 parent.suspend();
1535 assert!(child.is_suspended());
1536
1537 parent.resume();
1538 assert!(!parent.is_suspended());
1539 assert!(!child.is_suspended());
1540 }
1541
1542 #[test]
1543 fn multiple_suspend_resume_no_leak() {
1544 init();
1545 let scope = TaskScope::new();
1546 for _ in 0..50 {
1547 scope.suspend();
1548 assert!(scope.is_suspended());
1549 scope.resume();
1550 assert!(!scope.is_suspended());
1551 }
1552 }
1554
1555 #[test]
1556 fn suspended_scope_drops_without_panic() {
1557 init();
1558 {
1559 let scope = TaskScope::new();
1560 scope.suspend();
1561 let d = Rc::new(Cell::new(false));
1562 struct DropCheck(Rc<Cell<bool>>);
1563 impl Drop for DropCheck {
1564 fn drop(&mut self) {
1565 self.0.set(true);
1566 }
1567 }
1568 scope.spawn(async move {
1569 let _guard = DropCheck(d);
1570 std::future::pending::<()>().await;
1571 });
1572 }
1575 assert_eq!(executor::debug_task_count(), 0);
1577 }
1578
1579 #[test]
1580 fn siblings_not_affected_by_suspend() {
1581 init();
1582 let parent = TaskScope::new();
1583 let child_a = TaskScope::new_child(&parent);
1584 let child_b = TaskScope::new_child(&parent);
1585
1586 child_a.suspend();
1587 assert!(child_a.is_suspended());
1588 assert!(!child_b.is_suspended());
1589 assert!(!parent.is_suspended());
1590 }
1591
1592 use crate::Executor;
1595
1596 #[test]
1597 fn flush_instance_panicking_task_is_isolated() {
1598 init();
1599 let ex = Executor::new_instance();
1600 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1601
1602 let survived = Rc::new(Cell::new(false));
1603 let s = Rc::clone(&survived);
1604
1605 Executor::spawn(&ex, async move {
1606 panic!("intentional test panic in instance executor");
1607 });
1608 Executor::spawn(&ex, async move {
1609 s.set(true);
1610 });
1611 Executor::flush_instance(&ex);
1612
1613 assert!(survived.get());
1614 }
1615
1616 #[test]
1617 fn flush_instance_spawn_and_complete() {
1618 init();
1619 let ex = Executor::new_instance();
1620 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1621
1622 let counter = Rc::new(Cell::new(0u32));
1623 for _ in 0..20 {
1624 let c = Rc::clone(&counter);
1625 Executor::spawn(&ex, async move {
1626 c.set(c.get() + 1);
1627 });
1628 }
1629 Executor::flush_instance(&ex);
1630 assert_eq!(counter.get(), 20);
1631 }
1632
1633 use crate::timer;
1636
1637 #[test]
1638 fn timer_zero_duration_completes_immediately() {
1639 init();
1640 let done = Rc::new(Cell::new(false));
1641 let d = Rc::clone(&done);
1642 spawn_global(async move {
1643 timer::sleep(Duration::ZERO).await;
1644 d.set(true);
1645 });
1646 assert!(done.get());
1648 }
1649
1650 #[test]
1651 fn timer_normal_delay_fires_after_time_advances() {
1652 init();
1653 let ts = Rc::new(TestTimeSource::new(0));
1654 init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
1655
1656 let done = Rc::new(Cell::new(false));
1657 let d = Rc::clone(&done);
1658 spawn_global(async move {
1659 timer::sleep(Duration::from_millis(100)).await;
1660 d.set(true);
1661 });
1662 assert!(!done.get());
1664
1665 ts.advance(150);
1668 crate::executor::flush_all();
1669 assert!(done.get());
1670 }
1671
1672 #[test]
1673 fn timer_across_multiple_flushes() {
1674 init();
1675 let ts = Rc::new(TestTimeSource::new(0));
1676 init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
1677
1678 let counter = Rc::new(Cell::new(0u32));
1679 let c = Rc::clone(&counter);
1680 spawn_global(async move {
1681 for _ in 0..3 {
1682 timer::sleep(Duration::from_millis(100)).await;
1683 c.set(c.get() + 1);
1684 }
1685 });
1686 assert_eq!(counter.get(), 0);
1687
1688 ts.advance(100);
1689 crate::executor::flush_all();
1690 assert_eq!(counter.get(), 1);
1691
1692 ts.advance(100);
1693 crate::executor::flush_all();
1694 assert_eq!(counter.get(), 2);
1695
1696 ts.advance(100);
1697 crate::executor::flush_all();
1698 assert_eq!(counter.get(), 3);
1699 }
1700
1701 #[test]
1702 fn timer_cancelled_by_scope_drop() {
1703 init();
1704 let executed = Rc::new(Cell::new(false));
1705 let ex = Rc::clone(&executed);
1706 {
1707 let scope = TaskScope::new();
1708 scope.spawn(async move {
1709 timer::sleep(Duration::from_millis(500)).await;
1710 ex.set(true);
1711 });
1712 }
1713 assert!(!executed.get());
1716 assert_eq!(executor::debug_task_count(), 0);
1717 }
1718
1719 #[test]
1720 fn reentrant_flush_is_noop() {
1721 init();
1722 let reentered = Rc::new(Cell::new(false));
1729 let r = Rc::clone(&reentered);
1730 let sig = Signal::new(0);
1731 auralis_signal::subscribe(&sig, Rc::new(move || r.set(true)));
1732 sig.set(1);
1736 assert!(reentered.get());
1737 }
1738
1739 #[test]
1740 fn instance_executor_timer() {
1741 init();
1742 let ex = Executor::new_instance();
1743 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1744 let ts = Rc::new(TestTimeSource::new(0));
1745 Executor::install_time_source(&ex, Rc::clone(&ts) as Rc<dyn TimeSource>);
1746
1747 let done = Rc::new(Cell::new(false));
1748 let d = Rc::clone(&done);
1749 Executor::spawn(&ex, async move {
1750 timer::sleep(Duration::from_millis(50)).await;
1751 d.set(true);
1752 });
1753 assert!(!done.get());
1754
1755 ts.advance(60);
1757 Executor::flush_instance(&ex);
1758 assert!(done.get());
1759 }
1760
1761 #[test]
1762 fn set_deferred_routes_to_instance_executor() {
1763 init();
1764 let ex = Executor::new_instance();
1765 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1766
1767 let sig = Signal::new(0);
1768 let s = sig.clone();
1769
1770 Executor::spawn(&ex, async move {
1772 crate::set_deferred(&s, 42);
1773 });
1774
1775 Executor::flush_instance(&ex);
1777 assert_eq!(sig.read(), 42);
1779 }
1780
1781 #[test]
1784 fn panic_hook_is_invoked_on_task_panic() {
1785 init();
1786 let hook_called = Rc::new(Cell::new(false));
1787 let hc = Rc::clone(&hook_called);
1788
1789 crate::set_panic_hook(Rc::new(move |_info| {
1790 hc.set(true);
1791 }));
1792
1793 let scope = TaskScope::new();
1794 scope.spawn(async move { panic!("intentional") });
1795
1796 assert!(hook_called.get());
1798 }
1799
1800 #[test]
1801 fn current_scope_available_in_spawned_task() {
1802 init();
1803 let scope = TaskScope::new();
1804 let found = Rc::new(Cell::new(false));
1805 let f = Rc::clone(&found);
1806 scope.spawn(async move {
1807 f.set(crate::current_scope().is_some());
1808 });
1809 assert!(found.get());
1810 }
1811
1812 #[test]
1813 fn callback_handle_noop_does_not_panic() {
1814 let _h = crate::CallbackHandle::noop();
1815 }
1817
1818 #[test]
1819 fn sync_callback_fallback_without_schedule_hook() {
1820 crate::reset_executor_for_test();
1823 let sig = Signal::new(0);
1826 let called = Rc::new(Cell::new(false));
1827 let c = Rc::clone(&called);
1828 auralis_signal::subscribe(&sig, Rc::new(move || c.set(true)));
1829
1830 sig.set(1);
1831 assert!(called.get());
1833 }
1834
1835 #[test]
1836 fn set_deferred_isolated_to_instance_executor() {
1837 init();
1838 let ex1 = Executor::new_instance();
1839 Executor::install_flush_scheduler(&ex1, Rc::new(TestScheduleFlush));
1840 let ex2 = Executor::new_instance();
1841 Executor::install_flush_scheduler(&ex2, Rc::new(TestScheduleFlush));
1842
1843 let sig1 = Signal::new(0);
1844 let sig2 = Signal::new(0);
1845 let s1 = sig1.clone();
1846
1847 crate::with_executor(&ex1, || {
1849 crate::set_deferred(&s1, 42);
1850 });
1851 Executor::flush_instance(&ex1);
1852 assert_eq!(sig1.read(), 42);
1853 assert_eq!(sig2.read(), 0);
1855 }
1856
1857 #[test]
1858 fn notify_signal_state_follow_up_handles_reentrant_dirty() {
1859 let sig = Signal::new(0);
1862 let sig2 = sig.clone();
1863 let count = Rc::new(Cell::new(0u32));
1864 let c = Rc::clone(&count);
1865
1866 auralis_signal::subscribe(
1867 &sig,
1868 Rc::new(move || {
1869 c.set(c.get() + 1);
1870 if c.get() == 1 {
1872 sig2.set(2);
1873 }
1874 }),
1875 );
1876
1877 sig.set(1);
1878 assert_eq!(sig.read(), 2);
1881 assert_eq!(count.get(), 2);
1882 }
1883
1884 #[test]
1887 fn nested_spawn_preserves_outer_polling_task() {
1888 init();
1892 let executed = Rc::new(Cell::new(false));
1893 let ex = Rc::clone(&executed);
1894
1895 let scope = TaskScope::new();
1896 scope.spawn(async move {
1897 crate::spawn_global(async {});
1901 timer::sleep(Duration::ZERO).await;
1904 ex.set(true);
1905 });
1906
1907 assert!(executed.get());
1908 }
1909
1910 #[test]
1911 fn nested_spawn_preserves_polling_task_for_nonzero_timer() {
1912 init();
1913 let ts = Rc::new(TestTimeSource::new(0));
1914 init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
1915
1916 let executed = Rc::new(Cell::new(false));
1917 let ex = Rc::clone(&executed);
1918
1919 let scope = TaskScope::new();
1920 scope.spawn(async move {
1921 crate::spawn_global(async {});
1923 timer::sleep(Duration::from_millis(10)).await;
1925 ex.set(true);
1926 });
1927
1928 assert!(!executed.get());
1930 ts.advance(20);
1931 crate::executor::flush_all();
1932 assert!(executed.get());
1933 }
1934
1935 #[test]
1938 fn batch_spawn_with_no_auto_flush_then_manual_flush() {
1939 init();
1942
1943 let order = Rc::new(RefCell::new(Vec::new()));
1944 let o1 = Rc::clone(&order);
1945 let o2 = Rc::clone(&order);
1946
1947 executor::spawn_no_auto_flush(Priority::Low, async move {
1948 o1.borrow_mut().push("a");
1949 });
1950
1951 executor::spawn_no_auto_flush(Priority::Low, async move {
1952 o2.borrow_mut().push("b");
1953 });
1954
1955 executor::flush_all();
1956
1957 let r = order.borrow().clone();
1958 assert_eq!(r.len(), 2);
1959 assert!(r.contains(&"a"));
1960 assert!(r.contains(&"b"));
1961 }
1962
1963 #[test]
1964 fn spawn_many_tasks_all_complete() {
1965 init();
1968
1969 let counter = Rc::new(Cell::new(0u32));
1970 for _ in 0..20 {
1971 let c = Rc::clone(&counter);
1972 spawn_global(async move {
1973 c.set(c.get() + 1);
1974 });
1975 }
1976 assert_eq!(counter.get(), 20);
1977 assert_eq!(executor::debug_task_count(), 0);
1978 }
1979
1980 #[test]
1983 fn instance_executor_create_drop_recreate_works() {
1984 init();
1987
1988 for _ in 0..10 {
1989 let ex = Executor::new_instance();
1990 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1991 let done = Rc::new(Cell::new(false));
1992 let d = Rc::clone(&done);
1993 Executor::spawn(&ex, async move {
1994 d.set(true);
1995 });
1996 Executor::flush_instance(&ex);
1997 assert!(done.get());
1998 }
1999 }
2001
2002 #[test]
2003 fn executor_drop_recreate_signal_subscriptions_cleaned_up() {
2004 init();
2009
2010 let sig = Signal::new(0i32);
2011
2012 let ex1 = Executor::new_instance();
2013 Executor::install_flush_scheduler(&ex1, Rc::new(TestScheduleFlush));
2014
2015 let s = sig.clone();
2018 Executor::spawn(&ex1, async move {
2019 s.changed().await;
2020 });
2021
2022 drop(ex1);
2025
2026 let ex2 = Executor::new_instance();
2028 Executor::install_flush_scheduler(&ex2, Rc::new(TestScheduleFlush));
2029
2030 sig.set(42);
2034
2035 let done2 = Rc::new(Cell::new(false));
2037 let d2 = Rc::clone(&done2);
2038 Executor::spawn(&ex2, async move {
2039 d2.set(true);
2040 });
2041 Executor::flush_instance(&ex2);
2042 assert!(done2.get());
2043 }
2044
2045 #[test]
2046 fn multiple_instance_executors_independent_timers() {
2047 init();
2050 let ts = Rc::new(TestTimeSource::new(0));
2051 init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
2052
2053 let ex1 = Executor::new_instance();
2054 Executor::install_flush_scheduler(&ex1, Rc::new(TestScheduleFlush));
2055 Executor::install_time_source(&ex1, Rc::clone(&ts) as Rc<dyn TimeSource>);
2056
2057 let ex2 = Executor::new_instance();
2058 Executor::install_flush_scheduler(&ex2, Rc::new(TestScheduleFlush));
2059 Executor::install_time_source(&ex2, Rc::clone(&ts) as Rc<dyn TimeSource>);
2060
2061 let done1 = Rc::new(Cell::new(false));
2062 let done2 = Rc::new(Cell::new(false));
2063 let d1 = Rc::clone(&done1);
2064 let d2 = Rc::clone(&done2);
2065
2066 Executor::spawn(&ex1, async move {
2067 timer::sleep(Duration::from_millis(100)).await;
2068 d1.set(true);
2069 });
2070 Executor::spawn(&ex2, async move {
2071 timer::sleep(Duration::from_millis(50)).await;
2072 d2.set(true);
2073 });
2074
2075 assert!(!done1.get());
2077 assert!(!done2.get());
2078
2079 ts.advance(60);
2081 Executor::flush_instance(&ex2);
2082 assert!(!done1.get());
2083 assert!(done2.get());
2084
2085 ts.advance(60);
2087 Executor::flush_instance(&ex1);
2088 assert!(done1.get());
2089 }
2090}