1use std::any::{Any, TypeId};
6use std::cell::{Cell, RefCell};
7use std::collections::{HashMap, VecDeque};
8use std::future::Future;
9use std::pin::Pin;
10use std::rc::{Rc, Weak};
11
12use crate::executor;
13use crate::Priority;
14
15type ScopeId = u64;
16type TaskId = u64;
17
18thread_local! {
23 static NEXT_SCOPE_ID: Cell<ScopeId> = const { Cell::new(1) };
24}
25
26fn alloc_scope_id() -> ScopeId {
27 NEXT_SCOPE_ID.with(|c| {
28 let id = c.get();
29 c.set(id + 1);
30 id
31 })
32}
33
34pub struct CallbackHandle {
46 cleanup: Option<Box<dyn FnOnce() + 'static>>,
47}
48
49impl CallbackHandle {
50 pub fn new(cleanup: impl FnOnce() + 'static) -> Self {
52 Self {
53 cleanup: Some(Box::new(cleanup)),
54 }
55 }
56
57 #[must_use]
62 pub fn noop() -> Self {
63 Self { cleanup: None }
64 }
65}
66
67impl Drop for CallbackHandle {
68 fn drop(&mut self) {
69 if let Some(f) = self.cleanup.take() {
70 f();
71 }
72 }
73}
74
75type ScopeRegistryEntry = (Weak<RefCell<TaskScopeInner>>, Weak<Cell<bool>>);
96
97thread_local! {
98 static SCOPE_REGISTRY: RefCell<HashMap<ScopeId, ScopeRegistryEntry>> =
99 RefCell::new(HashMap::new());
100}
101
102fn register_scope(id: ScopeId, inner: &Rc<RefCell<TaskScopeInner>>, suspended: &Rc<Cell<bool>>) {
105 let _ = SCOPE_REGISTRY.try_with(|reg| {
106 if let Ok(mut r) = reg.try_borrow_mut() {
107 r.insert(id, (Rc::downgrade(inner), Rc::downgrade(suspended)));
108 }
109 });
110}
111
112fn unregister_scope(id: ScopeId) {
113 let _ = SCOPE_REGISTRY.try_with(|reg| {
114 if let Ok(mut r) = reg.try_borrow_mut() {
115 r.remove(&id);
116 }
117 });
118}
119
120#[must_use]
124pub fn find_scope(scope_id: ScopeId) -> Option<TaskScope> {
125 SCOPE_REGISTRY
126 .try_with(|reg| {
127 if let Ok(r) = reg.try_borrow() {
128 r.get(&scope_id).and_then(|(inner_weak, suspended_weak)| {
129 let inner = inner_weak.upgrade()?;
130 let suspended = suspended_weak.upgrade()?;
131 Some(TaskScope { inner, suspended })
132 })
133 } else {
134 None
135 }
136 })
137 .ok()
138 .flatten()
139}
140
141#[cfg(feature = "debug")]
145#[doc(hidden)]
146#[must_use]
147pub fn scope_debug_label(scope_id: ScopeId) -> Option<String> {
148 find_scope(scope_id).and_then(|s| s.inner.borrow().debug_label.clone())
149}
150
151#[doc(hidden)]
153pub fn clear_scope_registry() {
154 let _ = SCOPE_REGISTRY.try_with(|reg| {
155 if let Ok(mut r) = reg.try_borrow_mut() {
156 r.clear();
157 }
158 });
159}
160
161type ScopeSetFn = fn(Option<TaskScope>);
171type ScopeGetFn = fn() -> Option<TaskScope>;
172
173pub struct ScopeStore {
180 pub set_fn: ScopeSetFn,
182 pub get_fn: ScopeGetFn,
184}
185
186use std::sync::OnceLock;
187static SCOPE_STORE: OnceLock<ScopeStore> = OnceLock::new();
188
189fn ensure_default_store() -> &'static ScopeStore {
190 SCOPE_STORE.get_or_init(|| ScopeStore {
191 set_fn: thread_local_set,
192 get_fn: thread_local_get,
193 })
194}
195
196pub fn set_scope_store(store: ScopeStore) {
212 let _ = SCOPE_STORE.set(store);
213}
214
215thread_local! {
242 static CURRENT_SCOPE: RefCell<Option<TaskScope>> = const { RefCell::new(None) };
243}
244
245fn thread_local_set(scope: Option<TaskScope>) {
246 CURRENT_SCOPE.with(|cell| {
247 cell.replace(scope);
248 });
249}
250
251fn thread_local_get() -> Option<TaskScope> {
252 CURRENT_SCOPE.with(|cell| cell.borrow().clone())
253}
254
255pub(crate) fn set_scope_direct(scope: Option<TaskScope>) {
260 let store = ensure_default_store();
261 (store.set_fn)(scope);
262}
263
264pub(crate) fn get_scope_direct() -> Option<TaskScope> {
266 let store = ensure_default_store();
267 (store.get_fn)()
268}
269
270#[cfg(feature = "ssr-tokio")]
286pub fn init_scope_store_tokio() {
287 tokio::task_local! {
288 static TK_SCOPE: std::cell::RefCell<Option<TaskScope>>;
289 }
290
291 let _ = TK_SCOPE.try_with(|cell| {
293 cell.replace(None);
294 });
295
296 set_scope_store(ScopeStore {
297 set_fn: |s| {
298 let _ = TK_SCOPE.try_with(|cell| {
299 cell.replace(s);
300 });
301 },
302 get_fn: || {
303 TK_SCOPE
304 .try_with(|cell| cell.borrow().clone())
305 .ok()
306 .flatten()
307 },
308 });
309}
310
311pub fn with_current_scope<R>(scope: &TaskScope, f: impl FnOnce() -> R) -> R {
321 let store = ensure_default_store();
322 let prev = (store.get_fn)();
323 (store.set_fn)(Some(scope.clone_inner()));
324 let result = f();
325 (store.set_fn)(prev);
326 result
327}
328
329#[must_use]
331pub fn current_scope() -> Option<TaskScope> {
332 let store = ensure_default_store();
333 (store.get_fn)()
334}
335
336struct TaskScopeInner {
341 id: ScopeId,
342 task_ids: Vec<TaskId>,
343 children: Vec<TaskScope>,
344 parent: Option<Weak<RefCell<TaskScopeInner>>>,
346 context: RefCell<HashMap<TypeId, Rc<dyn Any>>>,
348 callbacks: RefCell<Vec<CallbackHandle>>,
350 #[cfg(feature = "debug")]
352 debug_label: Option<String>,
353 cancelled: bool,
354}
355
356#[must_use]
382pub struct TaskScope {
383 inner: Rc<RefCell<TaskScopeInner>>,
384 suspended: Rc<Cell<bool>>,
389}
390
391impl TaskScope {
392 pub fn new() -> Self {
394 let inner = Rc::new(RefCell::new(TaskScopeInner {
395 id: alloc_scope_id(),
396 task_ids: Vec::new(),
397 children: Vec::new(),
398 parent: None,
399 context: RefCell::new(HashMap::new()),
400 callbacks: RefCell::new(Vec::new()),
401 #[cfg(feature = "debug")]
402 debug_label: None,
403 cancelled: false,
404 }));
405 let id = inner.borrow().id;
406 let suspended = Rc::new(Cell::new(false));
407 register_scope(id, &inner, &suspended);
408 Self { inner, suspended }
409 }
410
411 pub fn new_child(parent: &Self) -> Self {
416 let inner = Rc::new(RefCell::new(TaskScopeInner {
417 id: alloc_scope_id(),
418 task_ids: Vec::new(),
419 children: Vec::new(),
420 parent: Some(Rc::downgrade(&parent.inner)),
421 context: RefCell::new(HashMap::new()),
422 callbacks: RefCell::new(Vec::new()),
423 #[cfg(feature = "debug")]
424 debug_label: None,
425 cancelled: false,
426 }));
427 let id = inner.borrow().id;
428 let suspended = Rc::new(Cell::new(false));
429 register_scope(id, &inner, &suspended);
430 let child = Self { inner, suspended };
431 parent.inner.borrow_mut().children.push(child.clone_inner());
432 child
433 }
434
435 pub fn spawn(&self, future: impl Future<Output = ()> + 'static) {
437 self.spawn_with_priority(Priority::Low, future);
438 }
439
440 pub fn spawn_with_priority(
446 &self,
447 priority: Priority,
448 future: impl Future<Output = ()> + 'static,
449 ) {
450 let mut inner = self.inner.borrow_mut();
451 if inner.cancelled {
452 return;
453 }
454 let task_id =
455 with_current_scope(self, || executor::spawn_scoped(priority, inner.id, future));
456 inner.task_ids.push(task_id);
457 }
458
459 pub fn register_callback_handle(&self, handle: CallbackHandle) {
467 let inner = self.inner.borrow();
468 if inner.cancelled {
469 return;
470 }
471 inner.callbacks.borrow_mut().push(handle);
472 }
473
474 pub fn provide<T: 'static>(&self, value: T) {
482 self.inner
483 .borrow()
484 .context
485 .borrow_mut()
486 .insert(TypeId::of::<T>(), Rc::new(value));
487 }
488
489 #[must_use]
494 pub fn consume<T: 'static>(&self) -> Option<Rc<T>> {
495 let mut current = Some(Rc::clone(&self.inner));
496
497 while let Some(inner) = current {
498 {
500 let inner_ref = inner.borrow();
501 let ctx = inner_ref.context.borrow();
502 if let Some(val) = ctx.get(&TypeId::of::<T>()) {
503 if let Ok(downcast) = val.clone().downcast::<T>() {
504 return Some(downcast);
505 }
506 }
507 }
508
509 let parent = {
511 let inner_ref = inner.borrow();
512 inner_ref.parent.as_ref().and_then(Weak::upgrade)
513 };
514 current = parent;
515 }
516
517 None
518 }
519
520 #[must_use]
527 #[track_caller]
528 pub fn expect_context<T: 'static>(&self) -> Rc<T> {
529 self.consume::<T>()
530 .unwrap_or_else(|| panic!("context not found: {}", std::any::type_name::<T>()))
531 }
532
533 #[cfg(feature = "debug")]
539 pub fn set_debug_label(&self, label: impl Into<String>) {
540 self.inner.borrow_mut().debug_label = Some(label.into());
541 }
542
543 #[cfg(test)]
547 #[must_use]
548 pub fn task_count(&self) -> usize {
549 self.inner.borrow().task_ids.len()
550 }
551
552 #[cfg(test)]
554 #[must_use]
555 pub fn child_count(&self) -> usize {
556 self.inner.borrow().children.len()
557 }
558
559 fn clone_inner(&self) -> Self {
562 Self {
563 inner: Rc::clone(&self.inner),
564 suspended: Rc::clone(&self.suspended),
565 }
566 }
567
568 pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
573 with_current_scope(self, f)
574 }
575
576 pub fn suspend(&self) {
586 if self.suspended.get() {
587 return;
588 }
589 self.suspended.set(true);
590 let children: Vec<TaskScope> = {
592 self.inner
593 .borrow()
594 .children
595 .iter()
596 .map(TaskScope::clone_inner)
597 .collect()
598 };
599 for child in &children {
600 child.suspend();
601 }
602 }
603
604 pub fn resume(&self) {
609 if !self.suspended.get() {
610 return;
611 }
612 self.suspended.set(false);
613
614 let (scope_id, children) = {
615 let inner = self.inner.borrow();
616 let id = inner.id;
617 let children: Vec<TaskScope> =
618 inner.children.iter().map(TaskScope::clone_inner).collect();
619 (id, children)
620 };
621
622 executor::enqueue_scope_tasks(scope_id);
624
625 for child in &children {
627 child.resume();
628 }
629 }
630
631 #[must_use]
633 pub fn is_suspended(&self) -> bool {
634 self.suspended.get()
635 }
636}
637
638impl Default for TaskScope {
639 fn default() -> Self {
640 Self::new()
641 }
642}
643
644impl Clone for TaskScope {
645 fn clone(&self) -> Self {
646 self.clone_inner()
647 }
648}
649
650impl Drop for TaskScope {
657 fn drop(&mut self) {
658 let Ok(mut inner) = self.inner.try_borrow_mut() else {
659 return;
662 };
663 if inner.cancelled {
664 return;
665 }
666 inner.cancelled = true;
667
668 inner.callbacks.borrow_mut().clear();
670
671 let mut descendants: Vec<Rc<RefCell<TaskScopeInner>>> = Vec::new();
673 {
674 let mut queue: VecDeque<Rc<RefCell<TaskScopeInner>>> = VecDeque::new();
675 for child in &inner.children {
676 queue.push_back(Rc::clone(&child.inner));
677 }
678
679 while let Some(scope_rc) = queue.pop_front() {
680 let scope = scope_rc.borrow();
681 for child in &scope.children {
682 queue.push_back(Rc::clone(&child.inner));
683 }
684 descendants.push(Rc::clone(&scope_rc));
685 }
686 }
687
688 for scope_rc in descendants.iter().rev() {
690 let mut scope = scope_rc.borrow_mut();
691 if scope.cancelled {
692 continue;
693 }
694 scope.cancelled = true;
695
696 scope.callbacks.borrow_mut().clear();
698
699 if !scope.task_ids.is_empty() {
700 let dropped_futures: Vec<Pin<Box<dyn Future<Output = ()>>>> =
701 executor::cancel_scope_tasks(scope.id);
702 drop(dropped_futures);
703 }
704 scope.context.borrow_mut().clear();
706
707 unregister_scope(scope.id);
712 }
713
714 if !inner.task_ids.is_empty() {
716 let dropped_futures = executor::cancel_scope_tasks(inner.id);
717 drop(dropped_futures);
718 }
719
720 inner.context.borrow_mut().clear();
721 inner.children.clear();
722
723 unregister_scope(inner.id);
725 }
726}
727
728#[macro_export]
738macro_rules! provide_context {
739 ($scope:expr, $value:expr) => {
740 $scope.provide($value)
741 };
742}
743
744#[macro_export]
750macro_rules! consume_context {
751 ($scope:expr, $ty:ty) => {
752 $scope.consume::<$ty>()
753 };
754}
755
756#[cfg(test)]
761#[allow(clippy::items_after_statements)]
762mod tests {
763 use super::*;
764 use crate::executor::{self, init_flush_scheduler, reset_executor_for_test, TestScheduleFlush};
765 use crate::{init_time_source, ScheduleFlush, TestTimeSource};
766 use std::cell::{Cell, RefCell};
767 use std::rc::Rc;
768
769 fn init() {
770 reset_executor_for_test();
771 init_flush_scheduler(Rc::new(TestScheduleFlush));
772 }
773
774 #[test]
777 fn new_scope_has_zero_tasks() {
778 let scope = TaskScope::new();
779 assert_eq!(scope.task_count(), 0);
780 assert_eq!(scope.child_count(), 0);
781 }
782
783 #[test]
784 fn new_child_attaches_to_parent() {
785 let parent = TaskScope::new();
786 let _child = TaskScope::new_child(&parent);
787 assert_eq!(parent.child_count(), 1);
788 }
789
790 #[test]
791 fn spawn_adds_task() {
792 init();
793 let scope = TaskScope::new();
794 scope.spawn(async {});
795 assert_eq!(scope.task_count(), 1);
796 }
797
798 #[test]
799 fn spawn_and_complete() {
800 init();
801 let done = Rc::new(Cell::new(false));
802 let done2 = Rc::clone(&done);
803 spawn_global(async move {
804 done2.set(true);
805 });
806 assert!(done.get());
807 }
808
809 #[test]
810 fn scope_spawn_and_cancel() {
811 init();
812 let dropped = Rc::new(Cell::new(false));
813 {
814 let scope = TaskScope::new();
815 let d = Rc::clone(&dropped);
816 struct DropCheck(Rc<Cell<bool>>);
817 impl Drop for DropCheck {
818 fn drop(&mut self) {
819 self.0.set(true);
820 }
821 }
822 scope.spawn(async move {
823 let _guard = DropCheck(d);
824 std::future::pending::<()>().await;
825 });
826 assert_eq!(executor::debug_task_count(), 1);
827 }
828 assert!(dropped.get());
829 assert_eq!(executor::debug_task_count(), 0);
830 }
831
832 #[test]
833 fn nested_scope_child_cancel_with_parent() {
834 init();
835 let dropped_child = Rc::new(Cell::new(false));
836 {
837 let parent = TaskScope::new();
838 let child = TaskScope::new_child(&parent);
839 let d = Rc::clone(&dropped_child);
840 struct DropCheck(Rc<Cell<bool>>);
841 impl Drop for DropCheck {
842 fn drop(&mut self) {
843 self.0.set(true);
844 }
845 }
846 child.spawn(async move {
847 let _guard = DropCheck(d);
848 std::future::pending::<()>().await;
849 });
850 assert_eq!(executor::debug_task_count(), 1);
851 }
852 assert!(dropped_child.get());
853 assert_eq!(executor::debug_task_count(), 0);
854 }
855
856 #[test]
857 fn deeply_nested_scope_drop_no_stack_overflow() {
858 init();
859 let root = TaskScope::new();
860 {
861 let mut current = TaskScope::new_child(&root);
862 for _ in 0..199 {
863 current = TaskScope::new_child(¤t);
864 }
865 }
866 drop(root);
867 assert_eq!(executor::debug_task_count(), 0);
868 }
869
870 #[test]
871 fn scope_child_explicit_tree() {
872 let root = TaskScope::new();
873 let a = TaskScope::new_child(&root);
874 let b = TaskScope::new_child(&root);
875 let _a1 = TaskScope::new_child(&a);
876 let _a2 = TaskScope::new_child(&a);
877 assert_eq!(root.child_count(), 2);
878 assert_eq!(a.child_count(), 2);
879 assert_eq!(b.child_count(), 0);
880 }
881
882 #[test]
885 fn callback_handle_dropped_before_tasks() {
886 init();
887 let dropped_order: Rc<RefCell<Vec<String>>> = Rc::new(RefCell::new(Vec::new()));
888 {
889 let scope = TaskScope::new();
890 let order1 = Rc::clone(&dropped_order);
891 scope.register_callback_handle(CallbackHandle::new(move || {
892 order1.borrow_mut().push("callback".to_string());
893 }));
894 let order2 = Rc::clone(&dropped_order);
895 struct DropCheck {
896 order: Rc<RefCell<Vec<String>>>,
897 label: String,
898 }
899 impl Drop for DropCheck {
900 fn drop(&mut self) {
901 self.order.borrow_mut().push(self.label.clone());
902 }
903 }
904 scope.spawn(async move {
905 let _guard = DropCheck {
906 order: order2,
907 label: "task".to_string(),
908 };
909 std::future::pending::<()>().await;
910 });
911 }
912 let order = dropped_order.borrow().clone();
913 assert_eq!(order, vec!["callback", "task"]);
914 }
915
916 #[test]
917 fn callback_handle_cleaned_up_on_child_scope_drop() {
918 init();
919 let called = Rc::new(Cell::new(false));
920 {
921 let parent = TaskScope::new();
922 let child = TaskScope::new_child(&parent);
923 let c = Rc::clone(&called);
924 child.register_callback_handle(CallbackHandle::new(move || {
925 c.set(true);
926 }));
927 }
929 assert!(called.get());
930 }
931
932 #[test]
935 fn context_provide_and_consume_in_same_scope() {
936 let scope = TaskScope::new();
937 scope.provide(42i32);
938 assert_eq!(*scope.consume::<i32>().unwrap(), 42);
939 }
940
941 #[test]
942 fn context_consume_walks_up_to_parent() {
943 let parent = TaskScope::new();
944 parent.provide("hello".to_string());
945 let child = TaskScope::new_child(&parent);
946 assert_eq!(*child.consume::<String>().unwrap(), "hello");
947 }
948
949 #[test]
950 fn context_consume_not_found() {
951 let scope = TaskScope::new();
952 assert!(scope.consume::<i32>().is_none());
953 }
954
955 #[test]
956 fn context_removed_on_scope_drop() {
957 let parent = TaskScope::new();
958 parent.provide(99u32);
959 {
960 let _child = TaskScope::new_child(&parent);
961 }
963 assert_eq!(*parent.consume::<u32>().unwrap(), 99);
965 }
966
967 #[test]
968 fn context_shadowing() {
969 let parent = TaskScope::new();
970 parent.provide(1i32);
971 let child = TaskScope::new_child(&parent);
972 child.provide(2i32);
973 assert_eq!(*child.consume::<i32>().unwrap(), 2);
975 assert_eq!(*parent.consume::<i32>().unwrap(), 1);
977 }
978
979 #[test]
980 #[should_panic(expected = "context not found")]
981 fn expect_context_panics_when_missing() {
982 let scope = TaskScope::new();
983 let _ = scope.expect_context::<String>();
984 }
985
986 #[test]
989 fn executor_priority_ordering() {
990 init();
991 let order = Rc::new(RefCell::new(Vec::new()));
992 let o1 = Rc::clone(&order);
993 executor::spawn_no_auto_flush(Priority::Low, async move {
994 o1.borrow_mut().push("low");
995 });
996 let o2 = Rc::clone(&order);
997 executor::spawn_no_auto_flush(Priority::High, async move {
998 o2.borrow_mut().push("high");
999 });
1000 executor::flush_all();
1001 let result = order.borrow().clone();
1002 assert_eq!(result, vec!["high", "low"]);
1003 }
1004
1005 #[test]
1006 fn executor_batch() {
1007 init();
1008 let counter = Rc::new(Cell::new(0u32));
1009 for _ in 0..10 {
1010 let c = Rc::clone(&counter);
1011 spawn_global(async move {
1012 c.set(c.get() + 1);
1013 });
1014 }
1015 assert_eq!(counter.get(), 10);
1016 assert_eq!(executor::debug_task_count(), 0);
1017 }
1018
1019 #[test]
1020 fn no_leak_on_cancel() {
1021 init();
1022 for _ in 0..50 {
1023 let scope = TaskScope::new();
1024 for _ in 0..5 {
1025 scope.spawn(std::future::pending::<()>());
1026 }
1027 }
1028 assert_eq!(executor::debug_task_count(), 0);
1029 }
1030
1031 #[test]
1032 fn set_deferred_triggers_after_flush() {
1033 use auralis_signal::Signal;
1034 init();
1035 let sig = Signal::new(0);
1036 let observed = Rc::new(Cell::new(0));
1037 set_deferred(&sig, 42);
1038 assert_eq!(sig.read(), 42);
1039 let ob1 = Rc::clone(&observed);
1040 spawn_global(async move {
1041 ob1.set(sig.read());
1042 });
1043 assert_eq!(observed.get(), 42);
1044 }
1045
1046 #[test]
1047 fn set_deferred_in_drop_safe() {
1048 use auralis_signal::Signal;
1049 init();
1050 let sig = Signal::new(0);
1051 struct SetOnDrop {
1052 sig: Signal<i32>,
1053 val: i32,
1054 }
1055 impl Drop for SetOnDrop {
1056 fn drop(&mut self) {
1057 set_deferred(&self.sig, self.val);
1058 }
1059 }
1060 let guard = SetOnDrop {
1061 sig: sig.clone(),
1062 val: 99,
1063 };
1064 drop(guard);
1065 assert_eq!(sig.read(), 99);
1066 }
1067
1068 #[test]
1069 fn set_deferred_from_drop_guard_during_scope_cancel() {
1070 use auralis_signal::Signal;
1071 init();
1072
1073 let sig = Signal::new(0i32);
1074
1075 struct ResetOnDrop {
1079 sig: Signal<i32>,
1080 }
1081 impl Drop for ResetOnDrop {
1082 fn drop(&mut self) {
1083 set_deferred(&self.sig, 42);
1084 }
1085 }
1086
1087 {
1088 let scope = TaskScope::new();
1089 let s = sig.clone();
1090 scope.spawn(async move {
1091 let _guard = ResetOnDrop { sig: s };
1092 std::future::pending::<()>().await;
1095 });
1096 }
1098
1099 assert_eq!(
1101 sig.read(),
1102 42,
1103 "set_deferred should have fired after scope cancel"
1104 );
1105 }
1106
1107 #[test]
1108 fn yield_now_gives_other_tasks_a_turn() {
1109 init();
1110 let order = Rc::new(RefCell::new(Vec::new()));
1111 let o1 = Rc::clone(&order);
1112 executor::spawn_no_auto_flush(Priority::Low, async move {
1113 o1.borrow_mut().push("a1");
1114 executor::yield_now().await;
1115 o1.borrow_mut().push("a2");
1116 });
1117 let o2 = Rc::clone(&order);
1118 executor::spawn_no_auto_flush(Priority::Low, async move {
1119 o2.borrow_mut().push("b1");
1120 o2.borrow_mut().push("b2");
1121 });
1122 executor::flush_all();
1123 let r = order.borrow().clone();
1124 assert_eq!(&r[0..3], &["a1", "b1", "b2"][..]);
1125 assert!(r.contains(&"a2"));
1126 }
1127
1128 #[test]
1129 fn panic_in_task_is_isolated() {
1130 init();
1131 let survived = Rc::new(Cell::new(false));
1132 let s = Rc::clone(&survived);
1133 spawn_global(async move {
1134 panic!("intentional test panic");
1135 });
1136 spawn_global(async move {
1137 s.set(true);
1138 });
1139 assert!(survived.get());
1140 assert_eq!(executor::debug_task_count(), 0);
1141 }
1142
1143 #[test]
1146 fn time_budget_with_test_time_source() {
1147 init();
1148 let ts = Rc::new(TestTimeSource::new(0));
1149 init_time_source(ts.clone());
1150
1151 let polled = Rc::new(Cell::new(0u32));
1152
1153 for _ in 0..50 {
1156 let pc = Rc::clone(&polled);
1157 let ts_c = Rc::clone(&ts);
1158 executor::spawn_no_auto_flush(Priority::Low, async move {
1159 pc.set(pc.get() + 1);
1160 ts_c.advance(1);
1161 });
1162 }
1163
1164 executor::flush_all();
1168
1169 assert_eq!(polled.get(), 50);
1170 assert_eq!(executor::debug_task_count(), 0);
1171 }
1172
1173 #[test]
1174 fn time_budget_honoured_with_split() {
1175 let schedule_count = Rc::new(Cell::new(0u32));
1179 struct NoopScheduleFlush(Rc<Cell<u32>>);
1180 impl ScheduleFlush for NoopScheduleFlush {
1181 fn schedule(&self, _callback: Box<dyn FnOnce()>) {
1182 self.0.set(self.0.get() + 1);
1183 }
1186 }
1187 init_flush_scheduler(Rc::new(NoopScheduleFlush(Rc::clone(&schedule_count))));
1188
1189 let ts = Rc::new(TestTimeSource::new(0));
1190 init_time_source(ts.clone());
1191
1192 let polled = Rc::new(RefCell::new(Vec::new()));
1193
1194 for i in 0..50u32 {
1195 let pc = Rc::clone(&polled);
1196 let ts_c = Rc::clone(&ts);
1197 executor::spawn_no_auto_flush(Priority::Low, async move {
1198 pc.borrow_mut().push(i);
1199 ts_c.advance(1);
1200 });
1201 }
1202
1203 executor::flush_all();
1204
1205 let completed = polled.borrow().len();
1206 assert!(
1207 completed < 50,
1208 "budget should split before all tasks run (only {completed} of 50)"
1209 );
1210 assert!(
1211 completed >= 7,
1212 "at least 7 tasks should run before budget expires ({completed})"
1213 );
1214 assert_eq!(
1215 schedule_count.get(),
1216 1,
1217 "next flush should have been scheduled exactly once"
1218 );
1219
1220 init_flush_scheduler(Rc::new(TestScheduleFlush));
1223 executor::flush_all();
1224 assert_eq!(executor::debug_task_count(), 0);
1225 }
1226
1227 #[test]
1230 fn provide_context_macro_works() {
1231 let scope = TaskScope::new();
1232 provide_context!(scope, 42i32);
1233 assert_eq!(*scope.consume::<i32>().unwrap(), 42);
1234 }
1235
1236 #[test]
1237 fn consume_context_macro_works() {
1238 let scope = TaskScope::new();
1239 scope.provide(99u32);
1240 let val: Option<Rc<u32>> = consume_context!(scope, u32);
1241 assert_eq!(*val.unwrap(), 99);
1242 }
1243
1244 #[test]
1245 fn consume_context_macro_not_found() {
1246 let scope = TaskScope::new();
1247 let val: Option<Rc<String>> = consume_context!(scope, String);
1248 assert!(val.is_none());
1249 }
1250
1251 #[cfg(feature = "debug")]
1254 #[test]
1255 fn dump_task_tree_returns_string() {
1256 init();
1257 let scope = TaskScope::new();
1258 scope.spawn(async { std::future::pending::<()>().await });
1259
1260 let output = crate::dump_task_tree();
1261 assert!(output.contains("Auralis Task Tree"));
1262 assert!(output.contains("Total active tasks: 1"));
1263 assert!(output.contains("Scope"));
1264 }
1265
1266 #[cfg(feature = "debug")]
1267 #[test]
1268 fn dump_task_tree_empty() {
1269 init();
1270 let output = crate::dump_task_tree();
1271 assert!(output.contains("(no active tasks)"));
1272 }
1273
1274 use crate::{set_deferred, spawn_global};
1275
1276 #[test]
1279 fn suspend_prevents_task_execution() {
1280 init();
1281 let scope = TaskScope::new();
1282 let executed = Rc::new(Cell::new(false));
1283 let ex = Rc::clone(&executed);
1284 scope.spawn(async move {
1285 ex.set(true);
1286 });
1287 assert!(executed.get());
1289 executed.set(false);
1290
1291 scope.suspend();
1292 let ex2 = Rc::clone(&executed);
1293 scope.spawn(async move {
1294 ex2.set(true);
1295 });
1296 assert!(!executed.get());
1298 }
1299
1300 #[test]
1301 fn resume_allows_task_execution() {
1302 init();
1303 let scope = TaskScope::new();
1304 scope.suspend();
1305 let executed = Rc::new(Cell::new(false));
1306 let ex = Rc::clone(&executed);
1307 scope.spawn(async move {
1308 ex.set(true);
1309 });
1310 assert!(!executed.get());
1311
1312 scope.resume();
1313 assert!(executed.get());
1315 }
1316
1317 #[test]
1318 fn suspend_cascades_to_children() {
1319 init();
1320 let parent = TaskScope::new();
1321 let child = TaskScope::new_child(&parent);
1322 assert!(!child.is_suspended());
1323
1324 parent.suspend();
1325 assert!(parent.is_suspended());
1326 assert!(child.is_suspended());
1327 }
1328
1329 #[test]
1330 fn resume_cascades_to_children() {
1331 init();
1332 let parent = TaskScope::new();
1333 let child = TaskScope::new_child(&parent);
1334 parent.suspend();
1335 assert!(child.is_suspended());
1336
1337 parent.resume();
1338 assert!(!parent.is_suspended());
1339 assert!(!child.is_suspended());
1340 }
1341
1342 #[test]
1343 fn multiple_suspend_resume_no_leak() {
1344 init();
1345 let scope = TaskScope::new();
1346 for _ in 0..50 {
1347 scope.suspend();
1348 assert!(scope.is_suspended());
1349 scope.resume();
1350 assert!(!scope.is_suspended());
1351 }
1352 }
1354
1355 #[test]
1356 fn suspended_scope_drops_without_panic() {
1357 init();
1358 {
1359 let scope = TaskScope::new();
1360 scope.suspend();
1361 let d = Rc::new(Cell::new(false));
1362 struct DropCheck(Rc<Cell<bool>>);
1363 impl Drop for DropCheck {
1364 fn drop(&mut self) {
1365 self.0.set(true);
1366 }
1367 }
1368 scope.spawn(async move {
1369 let _guard = DropCheck(d);
1370 std::future::pending::<()>().await;
1371 });
1372 }
1375 assert_eq!(executor::debug_task_count(), 0);
1377 }
1378
1379 #[test]
1380 fn siblings_not_affected_by_suspend() {
1381 init();
1382 let parent = TaskScope::new();
1383 let child_a = TaskScope::new_child(&parent);
1384 let child_b = TaskScope::new_child(&parent);
1385
1386 child_a.suspend();
1387 assert!(child_a.is_suspended());
1388 assert!(!child_b.is_suspended());
1389 assert!(!parent.is_suspended());
1390 }
1391
1392 use crate::Executor;
1395
1396 #[test]
1397 fn flush_instance_panicking_task_is_isolated() {
1398 init();
1399 let ex = Executor::new_instance();
1400 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1401
1402 let survived = Rc::new(Cell::new(false));
1403 let s = Rc::clone(&survived);
1404
1405 Executor::spawn(&ex, async move {
1406 panic!("intentional test panic in instance executor");
1407 });
1408 Executor::spawn(&ex, async move {
1409 s.set(true);
1410 });
1411 Executor::flush_instance(&ex);
1412
1413 assert!(survived.get());
1414 }
1415
1416 #[test]
1417 fn flush_instance_spawn_and_complete() {
1418 init();
1419 let ex = Executor::new_instance();
1420 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1421
1422 let counter = Rc::new(Cell::new(0u32));
1423 for _ in 0..20 {
1424 let c = Rc::clone(&counter);
1425 Executor::spawn(&ex, async move {
1426 c.set(c.get() + 1);
1427 });
1428 }
1429 Executor::flush_instance(&ex);
1430 assert_eq!(counter.get(), 20);
1431 }
1432}