1use std::any::{Any, TypeId};
6use std::cell::{Cell, RefCell};
7use std::collections::{HashMap, VecDeque};
8use std::fmt;
9use std::future::Future;
10use std::rc::{Rc, Weak};
11
12use auralis_signal::{Memo, Signal};
13
14use crate::executor;
15use crate::Priority;
16
17type ScopeId = u64;
18type TaskId = u64;
19
20thread_local! {
25 static NEXT_SCOPE_ID: Cell<ScopeId> = const { Cell::new(1) };
26}
27
28fn alloc_scope_id() -> ScopeId {
29 NEXT_SCOPE_ID.with(|c| {
30 let id = c.get();
31 c.set(id + 1);
32 id
33 })
34}
35
36pub struct CallbackHandle {
48 cleanup: Option<Box<dyn FnOnce() + 'static>>,
49}
50
51impl CallbackHandle {
52 pub fn new(cleanup: impl FnOnce() + 'static) -> Self {
54 Self {
55 cleanup: Some(Box::new(cleanup)),
56 }
57 }
58
59 #[must_use]
64 pub fn noop() -> Self {
65 Self { cleanup: None }
66 }
67}
68
69impl Drop for CallbackHandle {
70 fn drop(&mut self) {
71 if let Some(f) = self.cleanup.take() {
72 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
73 }
74 }
75}
76
77type ScopeRegistryEntry = (Weak<RefCell<TaskScopeInner>>, Weak<Cell<bool>>);
98
99thread_local! {
100 static SCOPE_REGISTRY: RefCell<HashMap<ScopeId, ScopeRegistryEntry>> =
101 RefCell::new(HashMap::new());
102}
103
104fn register_scope(id: ScopeId, inner: &Rc<RefCell<TaskScopeInner>>, suspended: &Rc<Cell<bool>>) {
107 let _ = SCOPE_REGISTRY.try_with(|reg| {
108 if let Ok(mut r) = reg.try_borrow_mut() {
109 r.insert(id, (Rc::downgrade(inner), Rc::downgrade(suspended)));
110 }
111 });
112}
113
114fn unregister_scope(id: ScopeId) {
115 let _ = SCOPE_REGISTRY.try_with(|reg| {
116 if let Ok(mut r) = reg.try_borrow_mut() {
117 r.remove(&id);
118 }
119 });
120}
121
122#[must_use]
126pub fn find_scope(scope_id: ScopeId) -> Option<TaskScope> {
127 SCOPE_REGISTRY
128 .try_with(|reg| {
129 if let Ok(r) = reg.try_borrow() {
130 r.get(&scope_id).and_then(|(inner_weak, suspended_weak)| {
131 let inner = inner_weak.upgrade()?;
132 let suspended = suspended_weak.upgrade()?;
133 let cancelled = inner.borrow().cancelled.clone();
134 Some(TaskScope {
135 inner,
136 cancelled,
137 suspended,
138 })
139 })
140 } else {
141 None
142 }
143 })
144 .ok()
145 .flatten()
146}
147
148#[cfg(feature = "debug")]
152#[doc(hidden)]
153#[must_use]
154pub fn scope_debug_label(scope_id: ScopeId) -> Option<String> {
155 find_scope(scope_id).and_then(|s| s.inner.borrow().debug_label.clone())
156}
157
158#[doc(hidden)]
160pub fn clear_scope_registry() {
161 let _ = SCOPE_REGISTRY.try_with(|reg| {
162 if let Ok(mut r) = reg.try_borrow_mut() {
163 r.clear();
164 }
165 });
166}
167
168type ScopeSetFn = fn(Option<TaskScope>);
178type ScopeGetFn = fn() -> Option<TaskScope>;
179
180pub struct ScopeStore {
187 pub set_fn: ScopeSetFn,
189 pub get_fn: ScopeGetFn,
191}
192
193use std::sync::OnceLock;
194static SCOPE_STORE: OnceLock<ScopeStore> = OnceLock::new();
195
196fn ensure_default_store() -> &'static ScopeStore {
197 SCOPE_STORE.get_or_init(|| ScopeStore {
198 set_fn: thread_local_set,
199 get_fn: thread_local_get,
200 })
201}
202
203pub fn set_scope_store(store: ScopeStore) {
219 let _ = SCOPE_STORE.set(store);
220}
221
222thread_local! {
249 static CURRENT_SCOPE: RefCell<Option<TaskScope>> = const { RefCell::new(None) };
250}
251
252fn thread_local_set(scope: Option<TaskScope>) {
253 CURRENT_SCOPE.with(|cell| {
254 cell.replace(scope);
255 });
256}
257
258fn thread_local_get() -> Option<TaskScope> {
259 CURRENT_SCOPE.with(|cell| cell.borrow().clone())
260}
261
262pub(crate) fn set_scope_direct(scope: Option<TaskScope>) {
267 let store = ensure_default_store();
268 (store.set_fn)(scope);
269}
270
271pub(crate) fn get_scope_direct() -> Option<TaskScope> {
273 let store = ensure_default_store();
274 (store.get_fn)()
275}
276
277#[cfg(feature = "ssr-tokio")]
293pub fn init_scope_store_tokio() {
294 tokio::task_local! {
295 static TK_SCOPE: std::cell::RefCell<Option<TaskScope>>;
296 }
297
298 let _ = TK_SCOPE.try_with(|cell| {
300 cell.replace(None);
301 });
302
303 set_scope_store(ScopeStore {
304 set_fn: |s| {
305 let _ = TK_SCOPE.try_with(|cell| {
306 cell.replace(s);
307 });
308 },
309 get_fn: || {
310 TK_SCOPE
311 .try_with(|cell| cell.borrow().clone())
312 .ok()
313 .flatten()
314 },
315 });
316}
317
318pub fn with_current_scope<R>(scope: &TaskScope, f: impl FnOnce() -> R) -> R {
328 let store = ensure_default_store();
329 let prev = (store.get_fn)();
330 (store.set_fn)(Some(scope.clone_inner()));
331 let result = f();
332 (store.set_fn)(prev);
333 result
334}
335
336#[must_use]
338pub fn current_scope() -> Option<TaskScope> {
339 let store = ensure_default_store();
340 (store.get_fn)()
341}
342
343struct TaskScopeInner {
348 id: ScopeId,
349 task_ids: Vec<TaskId>,
350 children: Vec<TaskScope>,
351 parent: Option<Weak<RefCell<TaskScopeInner>>>,
353 context: RefCell<HashMap<TypeId, Rc<dyn Any>>>,
355 callbacks: RefCell<Vec<CallbackHandle>>,
357 cancelled: Rc<Cell<bool>>,
362 #[cfg(feature = "debug")]
364 debug_label: Option<String>,
365 executor: executor::ExecutorRef,
370}
371
372pub struct JoinHandle {
382 task_id: Option<TaskId>,
383 executor: executor::ExecutorRef,
384}
385
386impl JoinHandle {
387 pub fn cancel(&self) {
392 if let Some(tid) = self.task_id {
393 executor::cancel_task(&self.executor, tid);
394 }
395 }
396
397 #[must_use]
402 pub fn is_finished(&self) -> bool {
403 match self.task_id {
404 Some(tid) => executor::is_task_finished(&self.executor, tid),
405 None => true,
406 }
407 }
408
409 #[must_use]
412 pub fn task_id(&self) -> Option<TaskId> {
413 self.task_id
414 }
415}
416
417impl fmt::Debug for JoinHandle {
418 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
419 f.debug_struct("JoinHandle")
420 .field("task_id", &self.task_id)
421 .finish_non_exhaustive()
422 }
423}
424
425#[must_use]
451pub struct TaskScope {
452 inner: Rc<RefCell<TaskScopeInner>>,
453 cancelled: Rc<Cell<bool>>,
459 suspended: Rc<Cell<bool>>,
462}
463
464impl TaskScope {
465 pub fn new() -> Self {
469 Self::with_executor(&executor::current_executor_instance())
470 }
471
472 pub fn with_executor(ex: &executor::ExecutorRef) -> Self {
478 let cancelled = Rc::new(Cell::new(false));
479 let inner = Rc::new(RefCell::new(TaskScopeInner {
480 id: alloc_scope_id(),
481 task_ids: Vec::new(),
482 children: Vec::new(),
483 parent: None,
484 context: RefCell::new(HashMap::new()),
485 callbacks: RefCell::new(Vec::new()),
486 cancelled: Rc::clone(&cancelled),
487 #[cfg(feature = "debug")]
488 debug_label: None,
489 executor: Rc::clone(ex),
490 }));
491 let id = inner.borrow().id;
492 let suspended = Rc::new(Cell::new(false));
493 register_scope(id, &inner, &suspended);
494 Self {
495 inner,
496 cancelled,
497 suspended,
498 }
499 }
500
501 pub fn new_child(parent: &Self) -> Self {
503 let ex = parent.inner.borrow().executor.clone();
504 let cancelled = Rc::new(Cell::new(false));
505 let inner = Rc::new(RefCell::new(TaskScopeInner {
506 id: alloc_scope_id(),
507 task_ids: Vec::new(),
508 children: Vec::new(),
509 parent: Some(Rc::downgrade(&parent.inner)),
510 context: RefCell::new(HashMap::new()),
511 callbacks: RefCell::new(Vec::new()),
512 cancelled: Rc::clone(&cancelled),
513 #[cfg(feature = "debug")]
514 debug_label: None,
515 executor: ex,
516 }));
517 let id = inner.borrow().id;
518 let suspended = Rc::new(Cell::new(false));
519 register_scope(id, &inner, &suspended);
520 let child = Self {
521 inner,
522 cancelled,
523 suspended,
524 };
525 parent.inner.borrow_mut().children.push(child.clone_inner());
526 child
527 }
528
529 pub fn spawn(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle {
535 self.spawn_with_priority(Priority::Low, future)
536 }
537
538 pub fn spawn_with_priority(
546 &self,
547 priority: Priority,
548 future: impl Future<Output = ()> + 'static,
549 ) -> JoinHandle {
550 let (cancelled, ex, scope_id) = {
556 let inner = self.inner.borrow();
557 (inner.cancelled.get(), Rc::clone(&inner.executor), inner.id)
558 };
559 if cancelled {
560 return JoinHandle {
561 task_id: None,
562 executor: ex,
563 };
564 }
565 let task_id = executor::with_executor(&ex, || {
566 with_current_scope(self, || {
567 executor::spawn_scoped_on(&ex, priority, scope_id, future)
568 })
569 });
570 self.inner.borrow_mut().task_ids.push(task_id);
571 JoinHandle {
572 task_id: Some(task_id),
573 executor: ex,
574 }
575 }
576
577 pub fn watch<T: Clone + 'static>(
590 &self,
591 sig: &Signal<T>,
592 f: impl FnMut(&T) + 'static,
593 ) -> JoinHandle {
594 let s = sig.clone();
595 let mut f = f;
596 self.spawn(async move {
597 loop {
598 s.changed().await;
599 f(&s.read());
600 }
601 })
602 }
603
604 pub fn watch_effect(&self, effect: impl Fn() + 'static) -> JoinHandle {
613 let memo = Memo::new(effect);
614 self.spawn(async move {
615 loop {
616 memo.changed().await;
617 #[allow(clippy::let_unit_value, clippy::ignored_unit_patterns)]
618 let _ = memo.read();
619 }
620 })
621 }
622
623 pub fn register_callback_handle(&self, handle: CallbackHandle) {
631 let inner = self.inner.borrow();
632 if inner.cancelled.get() {
633 return;
634 }
635 inner.callbacks.borrow_mut().push(handle);
636 }
637
638 pub fn on_cleanup(&self, f: impl FnOnce() + 'static) {
647 self.register_callback_handle(CallbackHandle::new(f));
648 }
649
650 pub fn provide<T: 'static>(&self, value: T) {
658 self.inner
659 .borrow()
660 .context
661 .borrow_mut()
662 .insert(TypeId::of::<T>(), Rc::new(value));
663 }
664
665 #[must_use]
670 pub fn consume<T: 'static>(&self) -> Option<Rc<T>> {
671 let mut current = Some(Rc::clone(&self.inner));
672
673 while let Some(inner) = current {
674 {
676 let inner_ref = inner.borrow();
677 let ctx = inner_ref.context.borrow();
678 if let Some(val) = ctx.get(&TypeId::of::<T>()) {
679 if let Ok(downcast) = val.clone().downcast::<T>() {
680 return Some(downcast);
681 }
682 }
683 }
684
685 let parent = {
687 let inner_ref = inner.borrow();
688 inner_ref.parent.as_ref().and_then(Weak::upgrade)
689 };
690 current = parent;
691 }
692
693 None
694 }
695
696 #[must_use]
703 #[track_caller]
704 pub fn expect_context<T: 'static>(&self) -> Rc<T> {
705 self.consume::<T>()
706 .unwrap_or_else(|| panic!("context not found: {}", std::any::type_name::<T>()))
707 }
708
709 #[must_use]
713 pub fn is_cancelled(&self) -> bool {
714 self.cancelled.get()
715 }
716
717 #[cfg(feature = "debug")]
723 pub fn set_debug_label(&self, label: impl Into<String>) {
724 self.inner.borrow_mut().debug_label = Some(label.into());
725 }
726
727 #[cfg(test)]
731 #[must_use]
732 pub fn task_count(&self) -> usize {
733 self.inner.borrow().task_ids.len()
734 }
735
736 #[cfg(test)]
738 #[must_use]
739 pub fn child_count(&self) -> usize {
740 self.inner.borrow().children.len()
741 }
742
743 fn clone_inner(&self) -> Self {
746 Self {
747 inner: Rc::clone(&self.inner),
748 cancelled: Rc::clone(&self.cancelled),
749 suspended: Rc::clone(&self.suspended),
750 }
751 }
752
753 pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
758 with_current_scope(self, f)
759 }
760
761 pub fn suspend(&self) {
771 if self.suspended.get() {
772 return;
773 }
774 self.suspended.set(true);
775 let children: Vec<TaskScope> = {
777 self.inner
778 .borrow()
779 .children
780 .iter()
781 .map(TaskScope::clone_inner)
782 .collect()
783 };
784 for child in &children {
785 child.suspend();
786 }
787 }
788
789 pub fn resume(&self) {
794 if !self.suspended.get() {
795 return;
796 }
797 self.suspended.set(false);
798
799 let (task_ids, children) = {
800 let inner = self.inner.borrow();
801 let tids = inner.task_ids.clone();
802 let children: Vec<TaskScope> =
803 inner.children.iter().map(TaskScope::clone_inner).collect();
804 (tids, children)
805 };
806
807 let ex = Rc::clone(&self.inner.borrow().executor);
809 executor::enqueue_scope_tasks_on(&ex, &task_ids);
810
811 for child in &children {
813 child.resume();
814 }
815 }
816
817 #[must_use]
819 pub fn is_suspended(&self) -> bool {
820 self.suspended.get()
821 }
822}
823
824impl Default for TaskScope {
825 fn default() -> Self {
826 Self::new()
827 }
828}
829
830impl Clone for TaskScope {
831 fn clone(&self) -> Self {
832 self.clone_inner()
833 }
834}
835
836impl Drop for TaskScope {
843 fn drop(&mut self) {
844 if Rc::strong_count(&self.inner) > 1 {
849 return;
850 }
851
852 self.cancelled.set(true);
856
857 let Ok(mut inner) = self.inner.try_borrow_mut() else {
858 eprintln!(
863 "[auralis-task] WARNING: TaskScope::drop cannot borrow inner \
864 (already borrowed). Tasks and callbacks in this scope will \
865 be cleaned up on the next executor flush. Avoid dropping \
866 the last TaskScope clone inside a callback."
867 );
868 return;
869 };
870
871 inner.callbacks.borrow_mut().clear();
873
874 let mut descendants: Vec<Rc<RefCell<TaskScopeInner>>> = Vec::new();
876 {
877 let mut queue: VecDeque<Rc<RefCell<TaskScopeInner>>> = VecDeque::new();
878 for child in &inner.children {
879 queue.push_back(Rc::clone(&child.inner));
880 }
881
882 while let Some(scope_rc) = queue.pop_front() {
883 let scope = scope_rc.borrow();
884 for child in &scope.children {
885 queue.push_back(Rc::clone(&child.inner));
886 }
887 descendants.push(Rc::clone(&scope_rc));
888 }
889 }
890
891 for scope_rc in descendants.iter().rev() {
893 let mut scope = scope_rc.borrow_mut();
894 if scope.cancelled.get() {
895 continue;
896 }
897 scope.cancelled.set(true);
898
899 scope.callbacks.borrow_mut().clear();
901
902 if !scope.task_ids.is_empty() {
903 let ex = Rc::clone(&scope.executor);
904 let task_ids = std::mem::take(&mut scope.task_ids);
905 let dropped_futures = executor::cancel_scope_tasks_on(&ex, &task_ids);
906 drop(dropped_futures);
907 }
908 scope.context.borrow_mut().clear();
909 unregister_scope(scope.id);
910 }
911
912 if !inner.task_ids.is_empty() {
914 let ex = Rc::clone(&inner.executor);
915 let task_ids = std::mem::take(&mut inner.task_ids);
916 let dropped_futures = executor::cancel_scope_tasks_on(&ex, &task_ids);
917 drop(dropped_futures);
918 }
919
920 inner.context.borrow_mut().clear();
921 inner.children.clear();
922
923 unregister_scope(inner.id);
925 }
926}
927
928#[macro_export]
938macro_rules! provide_context {
939 ($scope:expr, $value:expr) => {
940 $scope.provide($value)
941 };
942}
943
944#[macro_export]
950macro_rules! consume_context {
951 ($scope:expr, $ty:ty) => {
952 $scope.consume::<$ty>()
953 };
954}
955
956#[cfg(test)]
961#[allow(clippy::items_after_statements)]
962mod tests {
963 use super::*;
964 use crate::executor::{self, init_flush_scheduler, reset_executor_for_test, TestScheduleFlush};
965 use crate::{init_time_source, ScheduleFlush, TestTimeSource, TimeSource};
966 use auralis_signal::Signal;
967 use std::cell::{Cell, RefCell};
968 use std::rc::Rc;
969 use std::time::Duration;
970
971 fn init() {
972 reset_executor_for_test();
973 init_flush_scheduler(Rc::new(TestScheduleFlush));
974 }
975
976 #[test]
979 fn new_scope_has_zero_tasks() {
980 let scope = TaskScope::new();
981 assert_eq!(scope.task_count(), 0);
982 assert_eq!(scope.child_count(), 0);
983 }
984
985 #[test]
986 fn new_child_attaches_to_parent() {
987 let parent = TaskScope::new();
988 let _child = TaskScope::new_child(&parent);
989 assert_eq!(parent.child_count(), 1);
990 }
991
992 #[test]
993 fn spawn_adds_task() {
994 init();
995 let scope = TaskScope::new();
996 scope.spawn(async {});
997 assert_eq!(scope.task_count(), 1);
998 }
999
1000 #[test]
1001 fn spawn_and_complete() {
1002 init();
1003 let done = Rc::new(Cell::new(false));
1004 let done2 = Rc::clone(&done);
1005 spawn_global(async move {
1006 done2.set(true);
1007 });
1008 assert!(done.get());
1009 }
1010
1011 #[test]
1012 fn scope_spawn_and_cancel() {
1013 init();
1014 let dropped = Rc::new(Cell::new(false));
1015 {
1016 let scope = TaskScope::new();
1017 let d = Rc::clone(&dropped);
1018 struct DropCheck(Rc<Cell<bool>>);
1019 impl Drop for DropCheck {
1020 fn drop(&mut self) {
1021 self.0.set(true);
1022 }
1023 }
1024 scope.spawn(async move {
1025 let _guard = DropCheck(d);
1026 std::future::pending::<()>().await;
1027 });
1028 assert_eq!(executor::debug_task_count(), 1);
1029 }
1030 assert!(dropped.get());
1031 assert_eq!(executor::debug_task_count(), 0);
1032 }
1033
1034 #[test]
1035 fn nested_scope_child_cancel_with_parent() {
1036 init();
1037 let dropped_child = Rc::new(Cell::new(false));
1038 {
1039 let parent = TaskScope::new();
1040 let child = TaskScope::new_child(&parent);
1041 let d = Rc::clone(&dropped_child);
1042 struct DropCheck(Rc<Cell<bool>>);
1043 impl Drop for DropCheck {
1044 fn drop(&mut self) {
1045 self.0.set(true);
1046 }
1047 }
1048 child.spawn(async move {
1049 let _guard = DropCheck(d);
1050 std::future::pending::<()>().await;
1051 });
1052 assert_eq!(executor::debug_task_count(), 1);
1053 }
1054 assert!(dropped_child.get());
1055 assert_eq!(executor::debug_task_count(), 0);
1056 }
1057
1058 #[test]
1059 fn deeply_nested_scope_drop_no_stack_overflow() {
1060 init();
1061 let root = TaskScope::new();
1062 {
1063 let mut current = TaskScope::new_child(&root);
1064 for _ in 0..199 {
1065 current = TaskScope::new_child(¤t);
1066 }
1067 }
1068 drop(root);
1069 assert_eq!(executor::debug_task_count(), 0);
1070 }
1071
1072 #[test]
1073 fn scope_child_explicit_tree() {
1074 let root = TaskScope::new();
1075 let a = TaskScope::new_child(&root);
1076 let b = TaskScope::new_child(&root);
1077 let _a1 = TaskScope::new_child(&a);
1078 let _a2 = TaskScope::new_child(&a);
1079 assert_eq!(root.child_count(), 2);
1080 assert_eq!(a.child_count(), 2);
1081 assert_eq!(b.child_count(), 0);
1082 }
1083
1084 #[test]
1087 fn callback_handle_dropped_before_tasks() {
1088 init();
1089 let dropped_order: Rc<RefCell<Vec<String>>> = Rc::new(RefCell::new(Vec::new()));
1090 {
1091 let scope = TaskScope::new();
1092 let order1 = Rc::clone(&dropped_order);
1093 scope.register_callback_handle(CallbackHandle::new(move || {
1094 order1.borrow_mut().push("callback".to_string());
1095 }));
1096 let order2 = Rc::clone(&dropped_order);
1097 struct DropCheck {
1098 order: Rc<RefCell<Vec<String>>>,
1099 label: String,
1100 }
1101 impl Drop for DropCheck {
1102 fn drop(&mut self) {
1103 self.order.borrow_mut().push(self.label.clone());
1104 }
1105 }
1106 scope.spawn(async move {
1107 let _guard = DropCheck {
1108 order: order2,
1109 label: "task".to_string(),
1110 };
1111 std::future::pending::<()>().await;
1112 });
1113 }
1114 let order = dropped_order.borrow().clone();
1115 assert_eq!(order, vec!["callback", "task"]);
1116 }
1117
1118 #[test]
1119 fn callback_handle_cleaned_up_on_child_scope_drop() {
1120 init();
1121 let called = Rc::new(Cell::new(false));
1122 {
1123 let parent = TaskScope::new();
1124 let child = TaskScope::new_child(&parent);
1125 let c = Rc::clone(&called);
1126 child.register_callback_handle(CallbackHandle::new(move || {
1127 c.set(true);
1128 }));
1129 }
1131 assert!(called.get());
1132 }
1133
1134 #[test]
1137 fn context_provide_and_consume_in_same_scope() {
1138 let scope = TaskScope::new();
1139 scope.provide(42i32);
1140 assert_eq!(*scope.consume::<i32>().unwrap(), 42);
1141 }
1142
1143 #[test]
1144 fn context_consume_walks_up_to_parent() {
1145 let parent = TaskScope::new();
1146 parent.provide("hello".to_string());
1147 let child = TaskScope::new_child(&parent);
1148 assert_eq!(*child.consume::<String>().unwrap(), "hello");
1149 }
1150
1151 #[test]
1152 fn context_consume_not_found() {
1153 let scope = TaskScope::new();
1154 assert!(scope.consume::<i32>().is_none());
1155 }
1156
1157 #[test]
1158 fn context_removed_on_scope_drop() {
1159 let parent = TaskScope::new();
1160 parent.provide(99u32);
1161 {
1162 let _child = TaskScope::new_child(&parent);
1163 }
1165 assert_eq!(*parent.consume::<u32>().unwrap(), 99);
1167 }
1168
1169 #[test]
1170 fn context_shadowing() {
1171 let parent = TaskScope::new();
1172 parent.provide(1i32);
1173 let child = TaskScope::new_child(&parent);
1174 child.provide(2i32);
1175 assert_eq!(*child.consume::<i32>().unwrap(), 2);
1177 assert_eq!(*parent.consume::<i32>().unwrap(), 1);
1179 }
1180
1181 #[test]
1182 #[should_panic(expected = "context not found")]
1183 fn expect_context_panics_when_missing() {
1184 let scope = TaskScope::new();
1185 let _ = scope.expect_context::<String>();
1186 }
1187
1188 #[test]
1191 fn executor_priority_ordering() {
1192 init();
1193 let order = Rc::new(RefCell::new(Vec::new()));
1194 let o1 = Rc::clone(&order);
1195 executor::spawn_no_auto_flush(Priority::Low, async move {
1196 o1.borrow_mut().push("low");
1197 });
1198 let o2 = Rc::clone(&order);
1199 executor::spawn_no_auto_flush(Priority::High, async move {
1200 o2.borrow_mut().push("high");
1201 });
1202 executor::flush_all();
1203 let result = order.borrow().clone();
1204 assert_eq!(result, vec!["high", "low"]);
1205 }
1206
1207 #[test]
1208 fn executor_batch() {
1209 init();
1210 let counter = Rc::new(Cell::new(0u32));
1211 for _ in 0..10 {
1212 let c = Rc::clone(&counter);
1213 spawn_global(async move {
1214 c.set(c.get() + 1);
1215 });
1216 }
1217 assert_eq!(counter.get(), 10);
1218 assert_eq!(executor::debug_task_count(), 0);
1219 }
1220
1221 #[test]
1222 fn no_leak_on_cancel() {
1223 init();
1224 for _ in 0..50 {
1225 let scope = TaskScope::new();
1226 for _ in 0..5 {
1227 scope.spawn(std::future::pending::<()>());
1228 }
1229 }
1230 assert_eq!(executor::debug_task_count(), 0);
1231 }
1232
1233 #[test]
1234 fn set_deferred_triggers_after_flush() {
1235 use auralis_signal::Signal;
1236 init();
1237 let sig = Signal::new(0);
1238 let observed = Rc::new(Cell::new(0));
1239 set_deferred(&sig, 42);
1240 assert_eq!(sig.read(), 42);
1241 let ob1 = Rc::clone(&observed);
1242 spawn_global(async move {
1243 ob1.set(sig.read());
1244 });
1245 assert_eq!(observed.get(), 42);
1246 }
1247
1248 #[test]
1249 fn set_deferred_in_drop_safe() {
1250 use auralis_signal::Signal;
1251 init();
1252 let sig = Signal::new(0);
1253 struct SetOnDrop {
1254 sig: Signal<i32>,
1255 val: i32,
1256 }
1257 impl Drop for SetOnDrop {
1258 fn drop(&mut self) {
1259 set_deferred(&self.sig, self.val);
1260 }
1261 }
1262 let guard = SetOnDrop {
1263 sig: sig.clone(),
1264 val: 99,
1265 };
1266 drop(guard);
1267 assert_eq!(sig.read(), 99);
1268 }
1269
1270 #[test]
1271 fn set_deferred_from_drop_guard_during_scope_cancel() {
1272 use auralis_signal::Signal;
1273 init();
1274
1275 let sig = Signal::new(0i32);
1276
1277 struct ResetOnDrop {
1281 sig: Signal<i32>,
1282 }
1283 impl Drop for ResetOnDrop {
1284 fn drop(&mut self) {
1285 set_deferred(&self.sig, 42);
1286 }
1287 }
1288
1289 {
1290 let scope = TaskScope::new();
1291 let s = sig.clone();
1292 scope.spawn(async move {
1293 let _guard = ResetOnDrop { sig: s };
1294 std::future::pending::<()>().await;
1297 });
1298 }
1300
1301 assert_eq!(
1303 sig.read(),
1304 42,
1305 "set_deferred should have fired after scope cancel"
1306 );
1307 }
1308
1309 #[test]
1310 fn yield_now_gives_other_tasks_a_turn() {
1311 init();
1312 let order = Rc::new(RefCell::new(Vec::new()));
1313 let o1 = Rc::clone(&order);
1314 executor::spawn_no_auto_flush(Priority::Low, async move {
1315 o1.borrow_mut().push("a1");
1316 executor::yield_now().await;
1317 o1.borrow_mut().push("a2");
1318 });
1319 let o2 = Rc::clone(&order);
1320 executor::spawn_no_auto_flush(Priority::Low, async move {
1321 o2.borrow_mut().push("b1");
1322 o2.borrow_mut().push("b2");
1323 });
1324 executor::flush_all();
1325 let r = order.borrow().clone();
1326 assert_eq!(&r[0..3], &["a1", "b1", "b2"][..]);
1327 assert!(r.contains(&"a2"));
1328 }
1329
1330 #[test]
1331 fn panic_in_task_is_isolated() {
1332 init();
1333 let survived = Rc::new(Cell::new(false));
1334 let s = Rc::clone(&survived);
1335 spawn_global(async move {
1336 panic!("intentional test panic");
1337 });
1338 spawn_global(async move {
1339 s.set(true);
1340 });
1341 assert!(survived.get());
1342 assert_eq!(executor::debug_task_count(), 0);
1343 }
1344
1345 #[test]
1348 fn time_budget_with_test_time_source() {
1349 init();
1350 let ts = Rc::new(TestTimeSource::new(0));
1351 init_time_source(ts.clone());
1352
1353 let polled = Rc::new(Cell::new(0u32));
1354
1355 for _ in 0..50 {
1358 let pc = Rc::clone(&polled);
1359 let ts_c = Rc::clone(&ts);
1360 executor::spawn_no_auto_flush(Priority::Low, async move {
1361 pc.set(pc.get() + 1);
1362 ts_c.advance(1);
1363 });
1364 }
1365
1366 executor::flush_all();
1370
1371 assert_eq!(polled.get(), 50);
1372 assert_eq!(executor::debug_task_count(), 0);
1373 }
1374
1375 #[test]
1376 fn time_budget_honoured_with_split() {
1377 let schedule_count = Rc::new(Cell::new(0u32));
1381 struct NoopScheduleFlush(Rc<Cell<u32>>);
1382 impl ScheduleFlush for NoopScheduleFlush {
1383 fn schedule(&self, _callback: Box<dyn FnOnce()>) {
1384 self.0.set(self.0.get() + 1);
1385 }
1388 }
1389 init_flush_scheduler(Rc::new(NoopScheduleFlush(Rc::clone(&schedule_count))));
1390
1391 let ts = Rc::new(TestTimeSource::new(0));
1392 init_time_source(ts.clone());
1393
1394 let polled = Rc::new(RefCell::new(Vec::new()));
1395
1396 for i in 0..50u32 {
1397 let pc = Rc::clone(&polled);
1398 let ts_c = Rc::clone(&ts);
1399 executor::spawn_no_auto_flush(Priority::Low, async move {
1400 pc.borrow_mut().push(i);
1401 ts_c.advance(1);
1402 });
1403 }
1404
1405 executor::flush_all();
1406
1407 let completed = polled.borrow().len();
1408 assert!(
1409 completed < 50,
1410 "budget should split before all tasks run (only {completed} of 50)"
1411 );
1412 assert!(
1413 completed >= 7,
1414 "at least 7 tasks should run before budget expires ({completed})"
1415 );
1416 assert_eq!(
1417 schedule_count.get(),
1418 1,
1419 "next flush should have been scheduled exactly once"
1420 );
1421
1422 init_flush_scheduler(Rc::new(TestScheduleFlush));
1425 executor::flush_all();
1426 assert_eq!(executor::debug_task_count(), 0);
1427 }
1428
1429 #[test]
1432 fn provide_context_macro_works() {
1433 let scope = TaskScope::new();
1434 provide_context!(scope, 42i32);
1435 assert_eq!(*scope.consume::<i32>().unwrap(), 42);
1436 }
1437
1438 #[test]
1439 fn consume_context_macro_works() {
1440 let scope = TaskScope::new();
1441 scope.provide(99u32);
1442 let val: Option<Rc<u32>> = consume_context!(scope, u32);
1443 assert_eq!(*val.unwrap(), 99);
1444 }
1445
1446 #[test]
1447 fn consume_context_macro_not_found() {
1448 let scope = TaskScope::new();
1449 let val: Option<Rc<String>> = consume_context!(scope, String);
1450 assert!(val.is_none());
1451 }
1452
1453 #[cfg(feature = "debug")]
1456 #[test]
1457 fn dump_task_tree_returns_string() {
1458 init();
1459 let scope = TaskScope::new();
1460 scope.spawn(async { std::future::pending::<()>().await });
1461
1462 let output = crate::dump_task_tree();
1463 assert!(output.contains("Auralis Task Tree"));
1464 assert!(output.contains("Total active tasks: 1"));
1465 assert!(output.contains("Scope"));
1466 }
1467
1468 #[cfg(feature = "debug")]
1469 #[test]
1470 fn dump_task_tree_empty() {
1471 init();
1472 let output = crate::dump_task_tree();
1473 assert!(output.contains("(no active tasks)"));
1474 }
1475
1476 use crate::{set_deferred, spawn_global};
1477
1478 #[test]
1481 fn suspend_prevents_task_execution() {
1482 init();
1483 let scope = TaskScope::new();
1484 let executed = Rc::new(Cell::new(false));
1485 let ex = Rc::clone(&executed);
1486 scope.spawn(async move {
1487 ex.set(true);
1488 });
1489 assert!(executed.get());
1491 executed.set(false);
1492
1493 scope.suspend();
1494 let ex2 = Rc::clone(&executed);
1495 scope.spawn(async move {
1496 ex2.set(true);
1497 });
1498 assert!(!executed.get());
1500 }
1501
1502 #[test]
1503 fn resume_allows_task_execution() {
1504 init();
1505 let scope = TaskScope::new();
1506 scope.suspend();
1507 let executed = Rc::new(Cell::new(false));
1508 let ex = Rc::clone(&executed);
1509 scope.spawn(async move {
1510 ex.set(true);
1511 });
1512 assert!(!executed.get());
1513
1514 scope.resume();
1515 assert!(executed.get());
1517 }
1518
1519 #[test]
1520 fn suspend_cascades_to_children() {
1521 init();
1522 let parent = TaskScope::new();
1523 let child = TaskScope::new_child(&parent);
1524 assert!(!child.is_suspended());
1525
1526 parent.suspend();
1527 assert!(parent.is_suspended());
1528 assert!(child.is_suspended());
1529 }
1530
1531 #[test]
1532 fn resume_cascades_to_children() {
1533 init();
1534 let parent = TaskScope::new();
1535 let child = TaskScope::new_child(&parent);
1536 parent.suspend();
1537 assert!(child.is_suspended());
1538
1539 parent.resume();
1540 assert!(!parent.is_suspended());
1541 assert!(!child.is_suspended());
1542 }
1543
1544 #[test]
1545 fn multiple_suspend_resume_no_leak() {
1546 init();
1547 let scope = TaskScope::new();
1548 for _ in 0..50 {
1549 scope.suspend();
1550 assert!(scope.is_suspended());
1551 scope.resume();
1552 assert!(!scope.is_suspended());
1553 }
1554 }
1556
1557 #[test]
1558 fn suspended_scope_drops_without_panic() {
1559 init();
1560 {
1561 let scope = TaskScope::new();
1562 scope.suspend();
1563 let d = Rc::new(Cell::new(false));
1564 struct DropCheck(Rc<Cell<bool>>);
1565 impl Drop for DropCheck {
1566 fn drop(&mut self) {
1567 self.0.set(true);
1568 }
1569 }
1570 scope.spawn(async move {
1571 let _guard = DropCheck(d);
1572 std::future::pending::<()>().await;
1573 });
1574 }
1577 assert_eq!(executor::debug_task_count(), 0);
1579 }
1580
1581 #[test]
1582 fn siblings_not_affected_by_suspend() {
1583 init();
1584 let parent = TaskScope::new();
1585 let child_a = TaskScope::new_child(&parent);
1586 let child_b = TaskScope::new_child(&parent);
1587
1588 child_a.suspend();
1589 assert!(child_a.is_suspended());
1590 assert!(!child_b.is_suspended());
1591 assert!(!parent.is_suspended());
1592 }
1593
1594 use crate::Executor;
1597
1598 #[test]
1599 fn flush_instance_panicking_task_is_isolated() {
1600 init();
1601 let ex = Executor::new_instance();
1602 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1603
1604 let survived = Rc::new(Cell::new(false));
1605 let s = Rc::clone(&survived);
1606
1607 Executor::spawn(&ex, async move {
1608 panic!("intentional test panic in instance executor");
1609 });
1610 Executor::spawn(&ex, async move {
1611 s.set(true);
1612 });
1613 Executor::flush_instance(&ex);
1614
1615 assert!(survived.get());
1616 }
1617
1618 #[test]
1619 fn flush_instance_spawn_and_complete() {
1620 init();
1621 let ex = Executor::new_instance();
1622 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1623
1624 let counter = Rc::new(Cell::new(0u32));
1625 for _ in 0..20 {
1626 let c = Rc::clone(&counter);
1627 Executor::spawn(&ex, async move {
1628 c.set(c.get() + 1);
1629 });
1630 }
1631 Executor::flush_instance(&ex);
1632 assert_eq!(counter.get(), 20);
1633 }
1634
1635 use crate::timer;
1638
1639 #[test]
1640 fn timer_zero_duration_completes_immediately() {
1641 init();
1642 let done = Rc::new(Cell::new(false));
1643 let d = Rc::clone(&done);
1644 spawn_global(async move {
1645 timer::sleep(Duration::ZERO).await;
1646 d.set(true);
1647 });
1648 assert!(done.get());
1650 }
1651
1652 #[test]
1653 fn timer_normal_delay_fires_after_time_advances() {
1654 init();
1655 let ts = Rc::new(TestTimeSource::new(0));
1656 init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
1657
1658 let done = Rc::new(Cell::new(false));
1659 let d = Rc::clone(&done);
1660 spawn_global(async move {
1661 timer::sleep(Duration::from_millis(100)).await;
1662 d.set(true);
1663 });
1664 assert!(!done.get());
1666
1667 ts.advance(150);
1670 crate::executor::flush_all();
1671 assert!(done.get());
1672 }
1673
1674 #[test]
1675 fn timer_across_multiple_flushes() {
1676 init();
1677 let ts = Rc::new(TestTimeSource::new(0));
1678 init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
1679
1680 let counter = Rc::new(Cell::new(0u32));
1681 let c = Rc::clone(&counter);
1682 spawn_global(async move {
1683 for _ in 0..3 {
1684 timer::sleep(Duration::from_millis(100)).await;
1685 c.set(c.get() + 1);
1686 }
1687 });
1688 assert_eq!(counter.get(), 0);
1689
1690 ts.advance(100);
1691 crate::executor::flush_all();
1692 assert_eq!(counter.get(), 1);
1693
1694 ts.advance(100);
1695 crate::executor::flush_all();
1696 assert_eq!(counter.get(), 2);
1697
1698 ts.advance(100);
1699 crate::executor::flush_all();
1700 assert_eq!(counter.get(), 3);
1701 }
1702
1703 #[test]
1704 fn timer_cancelled_by_scope_drop() {
1705 init();
1706 let executed = Rc::new(Cell::new(false));
1707 let ex = Rc::clone(&executed);
1708 {
1709 let scope = TaskScope::new();
1710 scope.spawn(async move {
1711 timer::sleep(Duration::from_millis(500)).await;
1712 ex.set(true);
1713 });
1714 }
1715 assert!(!executed.get());
1718 assert_eq!(executor::debug_task_count(), 0);
1719 }
1720
1721 #[test]
1722 fn reentrant_flush_is_noop() {
1723 init();
1724 let reentered = Rc::new(Cell::new(false));
1731 let r = Rc::clone(&reentered);
1732 let sig = Signal::new(0);
1733 auralis_signal::subscribe(&sig, Rc::new(move || r.set(true)));
1734 sig.set(1);
1738 assert!(reentered.get());
1739 }
1740
1741 #[test]
1742 fn instance_executor_timer() {
1743 init();
1744 let ex = Executor::new_instance();
1745 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1746 let ts = Rc::new(TestTimeSource::new(0));
1747 Executor::install_time_source(&ex, Rc::clone(&ts) as Rc<dyn TimeSource>);
1748
1749 let done = Rc::new(Cell::new(false));
1750 let d = Rc::clone(&done);
1751 Executor::spawn(&ex, async move {
1752 timer::sleep(Duration::from_millis(50)).await;
1753 d.set(true);
1754 });
1755 assert!(!done.get());
1756
1757 ts.advance(60);
1759 Executor::flush_instance(&ex);
1760 assert!(done.get());
1761 }
1762
1763 #[test]
1764 fn set_deferred_routes_to_instance_executor() {
1765 init();
1766 let ex = Executor::new_instance();
1767 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1768
1769 let sig = Signal::new(0);
1770 let s = sig.clone();
1771
1772 Executor::spawn(&ex, async move {
1774 crate::set_deferred(&s, 42);
1775 });
1776
1777 Executor::flush_instance(&ex);
1779 assert_eq!(sig.read(), 42);
1781 }
1782
1783 #[test]
1786 fn panic_hook_is_invoked_on_task_panic() {
1787 init();
1788 let hook_called = Rc::new(Cell::new(false));
1789 let hc = Rc::clone(&hook_called);
1790
1791 crate::set_panic_hook(Rc::new(move |_info| {
1792 hc.set(true);
1793 }));
1794
1795 let scope = TaskScope::new();
1796 scope.spawn(async move { panic!("intentional") });
1797
1798 assert!(hook_called.get());
1800 }
1801
1802 #[test]
1803 fn current_scope_available_in_spawned_task() {
1804 init();
1805 let scope = TaskScope::new();
1806 let found = Rc::new(Cell::new(false));
1807 let f = Rc::clone(&found);
1808 scope.spawn(async move {
1809 f.set(crate::current_scope().is_some());
1810 });
1811 assert!(found.get());
1812 }
1813
1814 #[test]
1815 fn callback_handle_noop_does_not_panic() {
1816 let _h = crate::CallbackHandle::noop();
1817 }
1819
1820 #[test]
1821 fn sync_callback_fallback_without_schedule_hook() {
1822 crate::reset_executor_for_test();
1825 let sig = Signal::new(0);
1828 let called = Rc::new(Cell::new(false));
1829 let c = Rc::clone(&called);
1830 auralis_signal::subscribe(&sig, Rc::new(move || c.set(true)));
1831
1832 sig.set(1);
1833 assert!(called.get());
1835 }
1836
1837 #[test]
1838 fn set_deferred_isolated_to_instance_executor() {
1839 init();
1840 let ex1 = Executor::new_instance();
1841 Executor::install_flush_scheduler(&ex1, Rc::new(TestScheduleFlush));
1842 let ex2 = Executor::new_instance();
1843 Executor::install_flush_scheduler(&ex2, Rc::new(TestScheduleFlush));
1844
1845 let sig1 = Signal::new(0);
1846 let sig2 = Signal::new(0);
1847 let s1 = sig1.clone();
1848
1849 crate::with_executor(&ex1, || {
1851 crate::set_deferred(&s1, 42);
1852 });
1853 Executor::flush_instance(&ex1);
1854 assert_eq!(sig1.read(), 42);
1855 assert_eq!(sig2.read(), 0);
1857 }
1858
1859 #[test]
1860 fn notify_signal_state_follow_up_handles_reentrant_dirty() {
1861 let sig = Signal::new(0);
1864 let sig2 = sig.clone();
1865 let count = Rc::new(Cell::new(0u32));
1866 let c = Rc::clone(&count);
1867
1868 auralis_signal::subscribe(
1869 &sig,
1870 Rc::new(move || {
1871 c.set(c.get() + 1);
1872 if c.get() == 1 {
1874 sig2.set(2);
1875 }
1876 }),
1877 );
1878
1879 sig.set(1);
1880 assert_eq!(sig.read(), 2);
1883 assert_eq!(count.get(), 2);
1884 }
1885
1886 #[test]
1889 fn nested_spawn_preserves_outer_polling_task() {
1890 init();
1894 let executed = Rc::new(Cell::new(false));
1895 let ex = Rc::clone(&executed);
1896
1897 let scope = TaskScope::new();
1898 scope.spawn(async move {
1899 crate::spawn_global(async {});
1903 timer::sleep(Duration::ZERO).await;
1906 ex.set(true);
1907 });
1908
1909 assert!(executed.get());
1910 }
1911
1912 #[test]
1913 fn nested_spawn_preserves_polling_task_for_nonzero_timer() {
1914 init();
1915 let ts = Rc::new(TestTimeSource::new(0));
1916 init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
1917
1918 let executed = Rc::new(Cell::new(false));
1919 let ex = Rc::clone(&executed);
1920
1921 let scope = TaskScope::new();
1922 scope.spawn(async move {
1923 crate::spawn_global(async {});
1925 timer::sleep(Duration::from_millis(10)).await;
1927 ex.set(true);
1928 });
1929
1930 assert!(!executed.get());
1932 ts.advance(20);
1933 crate::executor::flush_all();
1934 assert!(executed.get());
1935 }
1936
1937 #[test]
1940 fn batch_spawn_with_no_auto_flush_then_manual_flush() {
1941 init();
1944
1945 let order = Rc::new(RefCell::new(Vec::new()));
1946 let o1 = Rc::clone(&order);
1947 let o2 = Rc::clone(&order);
1948
1949 executor::spawn_no_auto_flush(Priority::Low, async move {
1950 o1.borrow_mut().push("a");
1951 });
1952
1953 executor::spawn_no_auto_flush(Priority::Low, async move {
1954 o2.borrow_mut().push("b");
1955 });
1956
1957 executor::flush_all();
1958
1959 let r = order.borrow().clone();
1960 assert_eq!(r.len(), 2);
1961 assert!(r.contains(&"a"));
1962 assert!(r.contains(&"b"));
1963 }
1964
1965 #[test]
1966 fn spawn_many_tasks_all_complete() {
1967 init();
1970
1971 let counter = Rc::new(Cell::new(0u32));
1972 for _ in 0..20 {
1973 let c = Rc::clone(&counter);
1974 spawn_global(async move {
1975 c.set(c.get() + 1);
1976 });
1977 }
1978 assert_eq!(counter.get(), 20);
1979 assert_eq!(executor::debug_task_count(), 0);
1980 }
1981
1982 #[test]
1985 fn instance_executor_create_drop_recreate_works() {
1986 init();
1989
1990 for _ in 0..10 {
1991 let ex = Executor::new_instance();
1992 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1993 let done = Rc::new(Cell::new(false));
1994 let d = Rc::clone(&done);
1995 Executor::spawn(&ex, async move {
1996 d.set(true);
1997 });
1998 Executor::flush_instance(&ex);
1999 assert!(done.get());
2000 }
2001 }
2003
2004 #[test]
2005 fn executor_drop_recreate_signal_subscriptions_cleaned_up() {
2006 init();
2011
2012 let sig = Signal::new(0i32);
2013
2014 let ex1 = Executor::new_instance();
2015 Executor::install_flush_scheduler(&ex1, Rc::new(TestScheduleFlush));
2016
2017 let s = sig.clone();
2020 Executor::spawn(&ex1, async move {
2021 s.changed().await;
2022 });
2023
2024 drop(ex1);
2027
2028 let ex2 = Executor::new_instance();
2030 Executor::install_flush_scheduler(&ex2, Rc::new(TestScheduleFlush));
2031
2032 sig.set(42);
2036
2037 let done2 = Rc::new(Cell::new(false));
2039 let d2 = Rc::clone(&done2);
2040 Executor::spawn(&ex2, async move {
2041 d2.set(true);
2042 });
2043 Executor::flush_instance(&ex2);
2044 assert!(done2.get());
2045 }
2046
2047 #[test]
2048 fn multiple_instance_executors_independent_timers() {
2049 init();
2052 let ts = Rc::new(TestTimeSource::new(0));
2053 init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
2054
2055 let ex1 = Executor::new_instance();
2056 Executor::install_flush_scheduler(&ex1, Rc::new(TestScheduleFlush));
2057 Executor::install_time_source(&ex1, Rc::clone(&ts) as Rc<dyn TimeSource>);
2058
2059 let ex2 = Executor::new_instance();
2060 Executor::install_flush_scheduler(&ex2, Rc::new(TestScheduleFlush));
2061 Executor::install_time_source(&ex2, Rc::clone(&ts) as Rc<dyn TimeSource>);
2062
2063 let done1 = Rc::new(Cell::new(false));
2064 let done2 = Rc::new(Cell::new(false));
2065 let d1 = Rc::clone(&done1);
2066 let d2 = Rc::clone(&done2);
2067
2068 Executor::spawn(&ex1, async move {
2069 timer::sleep(Duration::from_millis(100)).await;
2070 d1.set(true);
2071 });
2072 Executor::spawn(&ex2, async move {
2073 timer::sleep(Duration::from_millis(50)).await;
2074 d2.set(true);
2075 });
2076
2077 assert!(!done1.get());
2079 assert!(!done2.get());
2080
2081 ts.advance(60);
2083 Executor::flush_instance(&ex2);
2084 assert!(!done1.get());
2085 assert!(done2.get());
2086
2087 ts.advance(60);
2089 Executor::flush_instance(&ex1);
2090 assert!(done1.get());
2091 }
2092}