1use std::any::{Any, TypeId};
6use std::cell::{Cell, RefCell};
7use std::collections::{HashMap, VecDeque};
8use std::future::Future;
9use std::pin::Pin;
10use std::rc::{Rc, Weak};
11
12use crate::executor;
13use crate::Priority;
14
15type ScopeId = u64;
16type TaskId = u64;
17
18thread_local! {
23 static NEXT_SCOPE_ID: Cell<ScopeId> = const { Cell::new(1) };
24}
25
26fn alloc_scope_id() -> ScopeId {
27 NEXT_SCOPE_ID.with(|c| {
28 let id = c.get();
29 c.set(id + 1);
30 id
31 })
32}
33
34pub struct CallbackHandle {
46 cleanup: Option<Box<dyn FnOnce() + 'static>>,
47}
48
49impl CallbackHandle {
50 pub fn new(cleanup: impl FnOnce() + 'static) -> Self {
52 Self {
53 cleanup: Some(Box::new(cleanup)),
54 }
55 }
56
57 #[must_use]
62 pub fn noop() -> Self {
63 Self { cleanup: None }
64 }
65}
66
67impl Drop for CallbackHandle {
68 fn drop(&mut self) {
69 if let Some(f) = self.cleanup.take() {
70 f();
71 }
72 }
73}
74
75type ScopeRegistryEntry = (Weak<RefCell<TaskScopeInner>>, Weak<Cell<bool>>);
96
97thread_local! {
98 static SCOPE_REGISTRY: RefCell<HashMap<ScopeId, ScopeRegistryEntry>> =
99 RefCell::new(HashMap::new());
100}
101
102fn register_scope(id: ScopeId, inner: &Rc<RefCell<TaskScopeInner>>, suspended: &Rc<Cell<bool>>) {
105 let _ = SCOPE_REGISTRY.try_with(|reg| {
106 if let Ok(mut r) = reg.try_borrow_mut() {
107 r.insert(id, (Rc::downgrade(inner), Rc::downgrade(suspended)));
108 }
109 });
110}
111
112fn unregister_scope(id: ScopeId) {
113 let _ = SCOPE_REGISTRY.try_with(|reg| {
114 if let Ok(mut r) = reg.try_borrow_mut() {
115 r.remove(&id);
116 }
117 });
118}
119
120#[must_use]
124pub fn find_scope(scope_id: ScopeId) -> Option<TaskScope> {
125 SCOPE_REGISTRY
126 .try_with(|reg| {
127 if let Ok(r) = reg.try_borrow() {
128 r.get(&scope_id).and_then(|(inner_weak, suspended_weak)| {
129 let inner = inner_weak.upgrade()?;
130 let suspended = suspended_weak.upgrade()?;
131 Some(TaskScope { inner, suspended })
132 })
133 } else {
134 None
135 }
136 })
137 .ok()
138 .flatten()
139}
140
141#[cfg(feature = "debug")]
145#[doc(hidden)]
146#[must_use]
147pub fn scope_debug_label(scope_id: ScopeId) -> Option<String> {
148 find_scope(scope_id).and_then(|s| s.inner.borrow().debug_label.clone())
149}
150
151#[doc(hidden)]
153pub fn clear_scope_registry() {
154 let _ = SCOPE_REGISTRY.try_with(|reg| {
155 if let Ok(mut r) = reg.try_borrow_mut() {
156 r.clear();
157 }
158 });
159}
160
161type ScopeSetFn = fn(Option<TaskScope>);
171type ScopeGetFn = fn() -> Option<TaskScope>;
172
173pub struct ScopeStore {
180 pub set_fn: ScopeSetFn,
182 pub get_fn: ScopeGetFn,
184}
185
186use std::sync::OnceLock;
187static SCOPE_STORE: OnceLock<ScopeStore> = OnceLock::new();
188
189fn ensure_default_store() -> &'static ScopeStore {
190 SCOPE_STORE.get_or_init(|| ScopeStore {
191 set_fn: thread_local_set,
192 get_fn: thread_local_get,
193 })
194}
195
196pub fn set_scope_store(store: ScopeStore) {
212 let _ = SCOPE_STORE.set(store);
213}
214
215thread_local! {
242 static CURRENT_SCOPE: RefCell<Option<TaskScope>> = const { RefCell::new(None) };
243}
244
245fn thread_local_set(scope: Option<TaskScope>) {
246 CURRENT_SCOPE.with(|cell| {
247 cell.replace(scope);
248 });
249}
250
251fn thread_local_get() -> Option<TaskScope> {
252 CURRENT_SCOPE.with(|cell| cell.borrow().clone())
253}
254
255pub(crate) fn set_scope_direct(scope: Option<TaskScope>) {
260 let store = ensure_default_store();
261 (store.set_fn)(scope);
262}
263
264pub(crate) fn get_scope_direct() -> Option<TaskScope> {
266 let store = ensure_default_store();
267 (store.get_fn)()
268}
269
270#[cfg(feature = "ssr-tokio")]
286pub fn init_scope_store_tokio() {
287 tokio::task_local! {
288 static TK_SCOPE: std::cell::RefCell<Option<TaskScope>>;
289 }
290
291 let _ = TK_SCOPE.try_with(|cell| {
293 cell.replace(None);
294 });
295
296 set_scope_store(ScopeStore {
297 set_fn: |s| {
298 let _ = TK_SCOPE.try_with(|cell| {
299 cell.replace(s);
300 });
301 },
302 get_fn: || {
303 TK_SCOPE
304 .try_with(|cell| cell.borrow().clone())
305 .ok()
306 .flatten()
307 },
308 });
309}
310
311pub fn with_current_scope<R>(scope: &TaskScope, f: impl FnOnce() -> R) -> R {
321 let store = ensure_default_store();
322 let prev = (store.get_fn)();
323 (store.set_fn)(Some(scope.clone_inner()));
324 let result = f();
325 (store.set_fn)(prev);
326 result
327}
328
329#[must_use]
331pub fn current_scope() -> Option<TaskScope> {
332 let store = ensure_default_store();
333 (store.get_fn)()
334}
335
336struct TaskScopeInner {
341 id: ScopeId,
342 task_ids: Vec<TaskId>,
343 children: Vec<TaskScope>,
344 parent: Option<Weak<RefCell<TaskScopeInner>>>,
346 context: RefCell<HashMap<TypeId, Rc<dyn Any>>>,
348 callbacks: RefCell<Vec<CallbackHandle>>,
350 #[cfg(feature = "debug")]
352 debug_label: Option<String>,
353 cancelled: bool,
354 executor: executor::ExecutorRef,
359}
360
361#[must_use]
387pub struct TaskScope {
388 inner: Rc<RefCell<TaskScopeInner>>,
389 suspended: Rc<Cell<bool>>,
394}
395
396impl TaskScope {
397 pub fn new() -> Self {
401 Self::with_executor(&executor::current_executor_instance())
402 }
403
404 pub fn with_executor(ex: &executor::ExecutorRef) -> Self {
410 let inner = Rc::new(RefCell::new(TaskScopeInner {
411 id: alloc_scope_id(),
412 task_ids: Vec::new(),
413 children: Vec::new(),
414 parent: None,
415 context: RefCell::new(HashMap::new()),
416 callbacks: RefCell::new(Vec::new()),
417 #[cfg(feature = "debug")]
418 debug_label: None,
419 cancelled: false,
420 executor: Rc::clone(ex),
421 }));
422 let id = inner.borrow().id;
423 let suspended = Rc::new(Cell::new(false));
424 register_scope(id, &inner, &suspended);
425 Self { inner, suspended }
426 }
427
428 pub fn new_child(parent: &Self) -> Self {
430 let ex = parent.inner.borrow().executor.clone();
431 let inner = Rc::new(RefCell::new(TaskScopeInner {
432 id: alloc_scope_id(),
433 task_ids: Vec::new(),
434 children: Vec::new(),
435 parent: Some(Rc::downgrade(&parent.inner)),
436 context: RefCell::new(HashMap::new()),
437 callbacks: RefCell::new(Vec::new()),
438 #[cfg(feature = "debug")]
439 debug_label: None,
440 cancelled: false,
441 executor: ex,
442 }));
443 let id = inner.borrow().id;
444 let suspended = Rc::new(Cell::new(false));
445 register_scope(id, &inner, &suspended);
446 let child = Self { inner, suspended };
447 parent.inner.borrow_mut().children.push(child.clone_inner());
448 child
449 }
450
451 pub fn spawn(&self, future: impl Future<Output = ()> + 'static) {
453 self.spawn_with_priority(Priority::Low, future);
454 }
455
456 pub fn spawn_with_priority(
462 &self,
463 priority: Priority,
464 future: impl Future<Output = ()> + 'static,
465 ) {
466 let inner = self.inner.borrow();
467 if inner.cancelled {
468 return;
469 }
470 let ex = Rc::clone(&inner.executor);
471 let task_id = executor::with_executor(&ex, || {
472 with_current_scope(self, || {
473 executor::spawn_scoped_on(&ex, priority, inner.id, future)
474 })
475 });
476 drop(inner);
477 self.inner.borrow_mut().task_ids.push(task_id);
478 }
479
480 pub fn register_callback_handle(&self, handle: CallbackHandle) {
488 let inner = self.inner.borrow();
489 if inner.cancelled {
490 return;
491 }
492 inner.callbacks.borrow_mut().push(handle);
493 }
494
495 pub fn provide<T: 'static>(&self, value: T) {
503 self.inner
504 .borrow()
505 .context
506 .borrow_mut()
507 .insert(TypeId::of::<T>(), Rc::new(value));
508 }
509
510 #[must_use]
515 pub fn consume<T: 'static>(&self) -> Option<Rc<T>> {
516 let mut current = Some(Rc::clone(&self.inner));
517
518 while let Some(inner) = current {
519 {
521 let inner_ref = inner.borrow();
522 let ctx = inner_ref.context.borrow();
523 if let Some(val) = ctx.get(&TypeId::of::<T>()) {
524 if let Ok(downcast) = val.clone().downcast::<T>() {
525 return Some(downcast);
526 }
527 }
528 }
529
530 let parent = {
532 let inner_ref = inner.borrow();
533 inner_ref.parent.as_ref().and_then(Weak::upgrade)
534 };
535 current = parent;
536 }
537
538 None
539 }
540
541 #[must_use]
548 #[track_caller]
549 pub fn expect_context<T: 'static>(&self) -> Rc<T> {
550 self.consume::<T>()
551 .unwrap_or_else(|| panic!("context not found: {}", std::any::type_name::<T>()))
552 }
553
554 #[cfg(feature = "debug")]
560 pub fn set_debug_label(&self, label: impl Into<String>) {
561 self.inner.borrow_mut().debug_label = Some(label.into());
562 }
563
564 #[cfg(test)]
568 #[must_use]
569 pub fn task_count(&self) -> usize {
570 self.inner.borrow().task_ids.len()
571 }
572
573 #[cfg(test)]
575 #[must_use]
576 pub fn child_count(&self) -> usize {
577 self.inner.borrow().children.len()
578 }
579
580 fn clone_inner(&self) -> Self {
583 Self {
584 inner: Rc::clone(&self.inner),
585 suspended: Rc::clone(&self.suspended),
586 }
587 }
588
589 pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
594 with_current_scope(self, f)
595 }
596
597 pub fn suspend(&self) {
607 if self.suspended.get() {
608 return;
609 }
610 self.suspended.set(true);
611 let children: Vec<TaskScope> = {
613 self.inner
614 .borrow()
615 .children
616 .iter()
617 .map(TaskScope::clone_inner)
618 .collect()
619 };
620 for child in &children {
621 child.suspend();
622 }
623 }
624
625 pub fn resume(&self) {
630 if !self.suspended.get() {
631 return;
632 }
633 self.suspended.set(false);
634
635 let (scope_id, children) = {
636 let inner = self.inner.borrow();
637 let id = inner.id;
638 let children: Vec<TaskScope> =
639 inner.children.iter().map(TaskScope::clone_inner).collect();
640 (id, children)
641 };
642
643 let ex = Rc::clone(&self.inner.borrow().executor);
645 executor::enqueue_scope_tasks_on(&ex, scope_id);
646
647 for child in &children {
649 child.resume();
650 }
651 }
652
653 #[must_use]
655 pub fn is_suspended(&self) -> bool {
656 self.suspended.get()
657 }
658}
659
660impl Default for TaskScope {
661 fn default() -> Self {
662 Self::new()
663 }
664}
665
666impl Clone for TaskScope {
667 fn clone(&self) -> Self {
668 self.clone_inner()
669 }
670}
671
672impl Drop for TaskScope {
679 fn drop(&mut self) {
680 let Ok(mut inner) = self.inner.try_borrow_mut() else {
681 #[cfg(debug_assertions)]
685 {
686 eprintln!(
687 "[auralis-task] WARNING: TaskScope::drop cannot borrow inner \
688 (already borrowed). If this was the last clone, tasks and \
689 callbacks will leak. Avoid dropping the last TaskScope clone \
690 inside a callback or during executor flush."
691 );
692 }
693 return;
694 };
695 if inner.cancelled {
696 return;
697 }
698 inner.cancelled = true;
699
700 inner.callbacks.borrow_mut().clear();
702
703 let mut descendants: Vec<Rc<RefCell<TaskScopeInner>>> = Vec::new();
705 {
706 let mut queue: VecDeque<Rc<RefCell<TaskScopeInner>>> = VecDeque::new();
707 for child in &inner.children {
708 queue.push_back(Rc::clone(&child.inner));
709 }
710
711 while let Some(scope_rc) = queue.pop_front() {
712 let scope = scope_rc.borrow();
713 for child in &scope.children {
714 queue.push_back(Rc::clone(&child.inner));
715 }
716 descendants.push(Rc::clone(&scope_rc));
717 }
718 }
719
720 for scope_rc in descendants.iter().rev() {
722 let mut scope = scope_rc.borrow_mut();
723 if scope.cancelled {
724 continue;
725 }
726 scope.cancelled = true;
727
728 scope.callbacks.borrow_mut().clear();
730
731 if !scope.task_ids.is_empty() {
732 let ex = Rc::clone(&scope.executor);
733 let dropped_futures: Vec<Pin<Box<dyn Future<Output = ()>>>> =
734 executor::cancel_scope_tasks_on(&ex, scope.id);
735 drop(dropped_futures);
736 }
737 scope.context.borrow_mut().clear();
738 unregister_scope(scope.id);
739 }
740
741 if !inner.task_ids.is_empty() {
743 let ex = Rc::clone(&inner.executor);
744 let dropped_futures = executor::cancel_scope_tasks_on(&ex, inner.id);
745 drop(dropped_futures);
746 }
747
748 inner.context.borrow_mut().clear();
749 inner.children.clear();
750
751 unregister_scope(inner.id);
753 }
754}
755
756#[macro_export]
766macro_rules! provide_context {
767 ($scope:expr, $value:expr) => {
768 $scope.provide($value)
769 };
770}
771
772#[macro_export]
778macro_rules! consume_context {
779 ($scope:expr, $ty:ty) => {
780 $scope.consume::<$ty>()
781 };
782}
783
784#[cfg(test)]
789#[allow(clippy::items_after_statements)]
790mod tests {
791 use super::*;
792 use crate::executor::{self, init_flush_scheduler, reset_executor_for_test, TestScheduleFlush};
793 use crate::{init_time_source, ScheduleFlush, TestTimeSource, TimeSource};
794 use auralis_signal::Signal;
795 use std::cell::{Cell, RefCell};
796 use std::rc::Rc;
797 use std::time::Duration;
798
799 fn init() {
800 reset_executor_for_test();
801 init_flush_scheduler(Rc::new(TestScheduleFlush));
802 }
803
804 #[test]
807 fn new_scope_has_zero_tasks() {
808 let scope = TaskScope::new();
809 assert_eq!(scope.task_count(), 0);
810 assert_eq!(scope.child_count(), 0);
811 }
812
813 #[test]
814 fn new_child_attaches_to_parent() {
815 let parent = TaskScope::new();
816 let _child = TaskScope::new_child(&parent);
817 assert_eq!(parent.child_count(), 1);
818 }
819
820 #[test]
821 fn spawn_adds_task() {
822 init();
823 let scope = TaskScope::new();
824 scope.spawn(async {});
825 assert_eq!(scope.task_count(), 1);
826 }
827
828 #[test]
829 fn spawn_and_complete() {
830 init();
831 let done = Rc::new(Cell::new(false));
832 let done2 = Rc::clone(&done);
833 spawn_global(async move {
834 done2.set(true);
835 });
836 assert!(done.get());
837 }
838
839 #[test]
840 fn scope_spawn_and_cancel() {
841 init();
842 let dropped = Rc::new(Cell::new(false));
843 {
844 let scope = TaskScope::new();
845 let d = Rc::clone(&dropped);
846 struct DropCheck(Rc<Cell<bool>>);
847 impl Drop for DropCheck {
848 fn drop(&mut self) {
849 self.0.set(true);
850 }
851 }
852 scope.spawn(async move {
853 let _guard = DropCheck(d);
854 std::future::pending::<()>().await;
855 });
856 assert_eq!(executor::debug_task_count(), 1);
857 }
858 assert!(dropped.get());
859 assert_eq!(executor::debug_task_count(), 0);
860 }
861
862 #[test]
863 fn nested_scope_child_cancel_with_parent() {
864 init();
865 let dropped_child = Rc::new(Cell::new(false));
866 {
867 let parent = TaskScope::new();
868 let child = TaskScope::new_child(&parent);
869 let d = Rc::clone(&dropped_child);
870 struct DropCheck(Rc<Cell<bool>>);
871 impl Drop for DropCheck {
872 fn drop(&mut self) {
873 self.0.set(true);
874 }
875 }
876 child.spawn(async move {
877 let _guard = DropCheck(d);
878 std::future::pending::<()>().await;
879 });
880 assert_eq!(executor::debug_task_count(), 1);
881 }
882 assert!(dropped_child.get());
883 assert_eq!(executor::debug_task_count(), 0);
884 }
885
886 #[test]
887 fn deeply_nested_scope_drop_no_stack_overflow() {
888 init();
889 let root = TaskScope::new();
890 {
891 let mut current = TaskScope::new_child(&root);
892 for _ in 0..199 {
893 current = TaskScope::new_child(¤t);
894 }
895 }
896 drop(root);
897 assert_eq!(executor::debug_task_count(), 0);
898 }
899
900 #[test]
901 fn scope_child_explicit_tree() {
902 let root = TaskScope::new();
903 let a = TaskScope::new_child(&root);
904 let b = TaskScope::new_child(&root);
905 let _a1 = TaskScope::new_child(&a);
906 let _a2 = TaskScope::new_child(&a);
907 assert_eq!(root.child_count(), 2);
908 assert_eq!(a.child_count(), 2);
909 assert_eq!(b.child_count(), 0);
910 }
911
912 #[test]
915 fn callback_handle_dropped_before_tasks() {
916 init();
917 let dropped_order: Rc<RefCell<Vec<String>>> = Rc::new(RefCell::new(Vec::new()));
918 {
919 let scope = TaskScope::new();
920 let order1 = Rc::clone(&dropped_order);
921 scope.register_callback_handle(CallbackHandle::new(move || {
922 order1.borrow_mut().push("callback".to_string());
923 }));
924 let order2 = Rc::clone(&dropped_order);
925 struct DropCheck {
926 order: Rc<RefCell<Vec<String>>>,
927 label: String,
928 }
929 impl Drop for DropCheck {
930 fn drop(&mut self) {
931 self.order.borrow_mut().push(self.label.clone());
932 }
933 }
934 scope.spawn(async move {
935 let _guard = DropCheck {
936 order: order2,
937 label: "task".to_string(),
938 };
939 std::future::pending::<()>().await;
940 });
941 }
942 let order = dropped_order.borrow().clone();
943 assert_eq!(order, vec!["callback", "task"]);
944 }
945
946 #[test]
947 fn callback_handle_cleaned_up_on_child_scope_drop() {
948 init();
949 let called = Rc::new(Cell::new(false));
950 {
951 let parent = TaskScope::new();
952 let child = TaskScope::new_child(&parent);
953 let c = Rc::clone(&called);
954 child.register_callback_handle(CallbackHandle::new(move || {
955 c.set(true);
956 }));
957 }
959 assert!(called.get());
960 }
961
962 #[test]
965 fn context_provide_and_consume_in_same_scope() {
966 let scope = TaskScope::new();
967 scope.provide(42i32);
968 assert_eq!(*scope.consume::<i32>().unwrap(), 42);
969 }
970
971 #[test]
972 fn context_consume_walks_up_to_parent() {
973 let parent = TaskScope::new();
974 parent.provide("hello".to_string());
975 let child = TaskScope::new_child(&parent);
976 assert_eq!(*child.consume::<String>().unwrap(), "hello");
977 }
978
979 #[test]
980 fn context_consume_not_found() {
981 let scope = TaskScope::new();
982 assert!(scope.consume::<i32>().is_none());
983 }
984
985 #[test]
986 fn context_removed_on_scope_drop() {
987 let parent = TaskScope::new();
988 parent.provide(99u32);
989 {
990 let _child = TaskScope::new_child(&parent);
991 }
993 assert_eq!(*parent.consume::<u32>().unwrap(), 99);
995 }
996
997 #[test]
998 fn context_shadowing() {
999 let parent = TaskScope::new();
1000 parent.provide(1i32);
1001 let child = TaskScope::new_child(&parent);
1002 child.provide(2i32);
1003 assert_eq!(*child.consume::<i32>().unwrap(), 2);
1005 assert_eq!(*parent.consume::<i32>().unwrap(), 1);
1007 }
1008
1009 #[test]
1010 #[should_panic(expected = "context not found")]
1011 fn expect_context_panics_when_missing() {
1012 let scope = TaskScope::new();
1013 let _ = scope.expect_context::<String>();
1014 }
1015
1016 #[test]
1019 fn executor_priority_ordering() {
1020 init();
1021 let order = Rc::new(RefCell::new(Vec::new()));
1022 let o1 = Rc::clone(&order);
1023 executor::spawn_no_auto_flush(Priority::Low, async move {
1024 o1.borrow_mut().push("low");
1025 });
1026 let o2 = Rc::clone(&order);
1027 executor::spawn_no_auto_flush(Priority::High, async move {
1028 o2.borrow_mut().push("high");
1029 });
1030 executor::flush_all();
1031 let result = order.borrow().clone();
1032 assert_eq!(result, vec!["high", "low"]);
1033 }
1034
1035 #[test]
1036 fn executor_batch() {
1037 init();
1038 let counter = Rc::new(Cell::new(0u32));
1039 for _ in 0..10 {
1040 let c = Rc::clone(&counter);
1041 spawn_global(async move {
1042 c.set(c.get() + 1);
1043 });
1044 }
1045 assert_eq!(counter.get(), 10);
1046 assert_eq!(executor::debug_task_count(), 0);
1047 }
1048
1049 #[test]
1050 fn no_leak_on_cancel() {
1051 init();
1052 for _ in 0..50 {
1053 let scope = TaskScope::new();
1054 for _ in 0..5 {
1055 scope.spawn(std::future::pending::<()>());
1056 }
1057 }
1058 assert_eq!(executor::debug_task_count(), 0);
1059 }
1060
1061 #[test]
1062 fn set_deferred_triggers_after_flush() {
1063 use auralis_signal::Signal;
1064 init();
1065 let sig = Signal::new(0);
1066 let observed = Rc::new(Cell::new(0));
1067 set_deferred(&sig, 42);
1068 assert_eq!(sig.read(), 42);
1069 let ob1 = Rc::clone(&observed);
1070 spawn_global(async move {
1071 ob1.set(sig.read());
1072 });
1073 assert_eq!(observed.get(), 42);
1074 }
1075
1076 #[test]
1077 fn set_deferred_in_drop_safe() {
1078 use auralis_signal::Signal;
1079 init();
1080 let sig = Signal::new(0);
1081 struct SetOnDrop {
1082 sig: Signal<i32>,
1083 val: i32,
1084 }
1085 impl Drop for SetOnDrop {
1086 fn drop(&mut self) {
1087 set_deferred(&self.sig, self.val);
1088 }
1089 }
1090 let guard = SetOnDrop {
1091 sig: sig.clone(),
1092 val: 99,
1093 };
1094 drop(guard);
1095 assert_eq!(sig.read(), 99);
1096 }
1097
1098 #[test]
1099 fn set_deferred_from_drop_guard_during_scope_cancel() {
1100 use auralis_signal::Signal;
1101 init();
1102
1103 let sig = Signal::new(0i32);
1104
1105 struct ResetOnDrop {
1109 sig: Signal<i32>,
1110 }
1111 impl Drop for ResetOnDrop {
1112 fn drop(&mut self) {
1113 set_deferred(&self.sig, 42);
1114 }
1115 }
1116
1117 {
1118 let scope = TaskScope::new();
1119 let s = sig.clone();
1120 scope.spawn(async move {
1121 let _guard = ResetOnDrop { sig: s };
1122 std::future::pending::<()>().await;
1125 });
1126 }
1128
1129 assert_eq!(
1131 sig.read(),
1132 42,
1133 "set_deferred should have fired after scope cancel"
1134 );
1135 }
1136
1137 #[test]
1138 fn yield_now_gives_other_tasks_a_turn() {
1139 init();
1140 let order = Rc::new(RefCell::new(Vec::new()));
1141 let o1 = Rc::clone(&order);
1142 executor::spawn_no_auto_flush(Priority::Low, async move {
1143 o1.borrow_mut().push("a1");
1144 executor::yield_now().await;
1145 o1.borrow_mut().push("a2");
1146 });
1147 let o2 = Rc::clone(&order);
1148 executor::spawn_no_auto_flush(Priority::Low, async move {
1149 o2.borrow_mut().push("b1");
1150 o2.borrow_mut().push("b2");
1151 });
1152 executor::flush_all();
1153 let r = order.borrow().clone();
1154 assert_eq!(&r[0..3], &["a1", "b1", "b2"][..]);
1155 assert!(r.contains(&"a2"));
1156 }
1157
1158 #[test]
1159 fn panic_in_task_is_isolated() {
1160 init();
1161 let survived = Rc::new(Cell::new(false));
1162 let s = Rc::clone(&survived);
1163 spawn_global(async move {
1164 panic!("intentional test panic");
1165 });
1166 spawn_global(async move {
1167 s.set(true);
1168 });
1169 assert!(survived.get());
1170 assert_eq!(executor::debug_task_count(), 0);
1171 }
1172
1173 #[test]
1176 fn time_budget_with_test_time_source() {
1177 init();
1178 let ts = Rc::new(TestTimeSource::new(0));
1179 init_time_source(ts.clone());
1180
1181 let polled = Rc::new(Cell::new(0u32));
1182
1183 for _ in 0..50 {
1186 let pc = Rc::clone(&polled);
1187 let ts_c = Rc::clone(&ts);
1188 executor::spawn_no_auto_flush(Priority::Low, async move {
1189 pc.set(pc.get() + 1);
1190 ts_c.advance(1);
1191 });
1192 }
1193
1194 executor::flush_all();
1198
1199 assert_eq!(polled.get(), 50);
1200 assert_eq!(executor::debug_task_count(), 0);
1201 }
1202
1203 #[test]
1204 fn time_budget_honoured_with_split() {
1205 let schedule_count = Rc::new(Cell::new(0u32));
1209 struct NoopScheduleFlush(Rc<Cell<u32>>);
1210 impl ScheduleFlush for NoopScheduleFlush {
1211 fn schedule(&self, _callback: Box<dyn FnOnce()>) {
1212 self.0.set(self.0.get() + 1);
1213 }
1216 }
1217 init_flush_scheduler(Rc::new(NoopScheduleFlush(Rc::clone(&schedule_count))));
1218
1219 let ts = Rc::new(TestTimeSource::new(0));
1220 init_time_source(ts.clone());
1221
1222 let polled = Rc::new(RefCell::new(Vec::new()));
1223
1224 for i in 0..50u32 {
1225 let pc = Rc::clone(&polled);
1226 let ts_c = Rc::clone(&ts);
1227 executor::spawn_no_auto_flush(Priority::Low, async move {
1228 pc.borrow_mut().push(i);
1229 ts_c.advance(1);
1230 });
1231 }
1232
1233 executor::flush_all();
1234
1235 let completed = polled.borrow().len();
1236 assert!(
1237 completed < 50,
1238 "budget should split before all tasks run (only {completed} of 50)"
1239 );
1240 assert!(
1241 completed >= 7,
1242 "at least 7 tasks should run before budget expires ({completed})"
1243 );
1244 assert_eq!(
1245 schedule_count.get(),
1246 1,
1247 "next flush should have been scheduled exactly once"
1248 );
1249
1250 init_flush_scheduler(Rc::new(TestScheduleFlush));
1253 executor::flush_all();
1254 assert_eq!(executor::debug_task_count(), 0);
1255 }
1256
1257 #[test]
1260 fn provide_context_macro_works() {
1261 let scope = TaskScope::new();
1262 provide_context!(scope, 42i32);
1263 assert_eq!(*scope.consume::<i32>().unwrap(), 42);
1264 }
1265
1266 #[test]
1267 fn consume_context_macro_works() {
1268 let scope = TaskScope::new();
1269 scope.provide(99u32);
1270 let val: Option<Rc<u32>> = consume_context!(scope, u32);
1271 assert_eq!(*val.unwrap(), 99);
1272 }
1273
1274 #[test]
1275 fn consume_context_macro_not_found() {
1276 let scope = TaskScope::new();
1277 let val: Option<Rc<String>> = consume_context!(scope, String);
1278 assert!(val.is_none());
1279 }
1280
1281 #[cfg(feature = "debug")]
1284 #[test]
1285 fn dump_task_tree_returns_string() {
1286 init();
1287 let scope = TaskScope::new();
1288 scope.spawn(async { std::future::pending::<()>().await });
1289
1290 let output = crate::dump_task_tree();
1291 assert!(output.contains("Auralis Task Tree"));
1292 assert!(output.contains("Total active tasks: 1"));
1293 assert!(output.contains("Scope"));
1294 }
1295
1296 #[cfg(feature = "debug")]
1297 #[test]
1298 fn dump_task_tree_empty() {
1299 init();
1300 let output = crate::dump_task_tree();
1301 assert!(output.contains("(no active tasks)"));
1302 }
1303
1304 use crate::{set_deferred, spawn_global};
1305
1306 #[test]
1309 fn suspend_prevents_task_execution() {
1310 init();
1311 let scope = TaskScope::new();
1312 let executed = Rc::new(Cell::new(false));
1313 let ex = Rc::clone(&executed);
1314 scope.spawn(async move {
1315 ex.set(true);
1316 });
1317 assert!(executed.get());
1319 executed.set(false);
1320
1321 scope.suspend();
1322 let ex2 = Rc::clone(&executed);
1323 scope.spawn(async move {
1324 ex2.set(true);
1325 });
1326 assert!(!executed.get());
1328 }
1329
1330 #[test]
1331 fn resume_allows_task_execution() {
1332 init();
1333 let scope = TaskScope::new();
1334 scope.suspend();
1335 let executed = Rc::new(Cell::new(false));
1336 let ex = Rc::clone(&executed);
1337 scope.spawn(async move {
1338 ex.set(true);
1339 });
1340 assert!(!executed.get());
1341
1342 scope.resume();
1343 assert!(executed.get());
1345 }
1346
1347 #[test]
1348 fn suspend_cascades_to_children() {
1349 init();
1350 let parent = TaskScope::new();
1351 let child = TaskScope::new_child(&parent);
1352 assert!(!child.is_suspended());
1353
1354 parent.suspend();
1355 assert!(parent.is_suspended());
1356 assert!(child.is_suspended());
1357 }
1358
1359 #[test]
1360 fn resume_cascades_to_children() {
1361 init();
1362 let parent = TaskScope::new();
1363 let child = TaskScope::new_child(&parent);
1364 parent.suspend();
1365 assert!(child.is_suspended());
1366
1367 parent.resume();
1368 assert!(!parent.is_suspended());
1369 assert!(!child.is_suspended());
1370 }
1371
1372 #[test]
1373 fn multiple_suspend_resume_no_leak() {
1374 init();
1375 let scope = TaskScope::new();
1376 for _ in 0..50 {
1377 scope.suspend();
1378 assert!(scope.is_suspended());
1379 scope.resume();
1380 assert!(!scope.is_suspended());
1381 }
1382 }
1384
1385 #[test]
1386 fn suspended_scope_drops_without_panic() {
1387 init();
1388 {
1389 let scope = TaskScope::new();
1390 scope.suspend();
1391 let d = Rc::new(Cell::new(false));
1392 struct DropCheck(Rc<Cell<bool>>);
1393 impl Drop for DropCheck {
1394 fn drop(&mut self) {
1395 self.0.set(true);
1396 }
1397 }
1398 scope.spawn(async move {
1399 let _guard = DropCheck(d);
1400 std::future::pending::<()>().await;
1401 });
1402 }
1405 assert_eq!(executor::debug_task_count(), 0);
1407 }
1408
1409 #[test]
1410 fn siblings_not_affected_by_suspend() {
1411 init();
1412 let parent = TaskScope::new();
1413 let child_a = TaskScope::new_child(&parent);
1414 let child_b = TaskScope::new_child(&parent);
1415
1416 child_a.suspend();
1417 assert!(child_a.is_suspended());
1418 assert!(!child_b.is_suspended());
1419 assert!(!parent.is_suspended());
1420 }
1421
1422 use crate::Executor;
1425
1426 #[test]
1427 fn flush_instance_panicking_task_is_isolated() {
1428 init();
1429 let ex = Executor::new_instance();
1430 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1431
1432 let survived = Rc::new(Cell::new(false));
1433 let s = Rc::clone(&survived);
1434
1435 Executor::spawn(&ex, async move {
1436 panic!("intentional test panic in instance executor");
1437 });
1438 Executor::spawn(&ex, async move {
1439 s.set(true);
1440 });
1441 Executor::flush_instance(&ex);
1442
1443 assert!(survived.get());
1444 }
1445
1446 #[test]
1447 fn flush_instance_spawn_and_complete() {
1448 init();
1449 let ex = Executor::new_instance();
1450 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1451
1452 let counter = Rc::new(Cell::new(0u32));
1453 for _ in 0..20 {
1454 let c = Rc::clone(&counter);
1455 Executor::spawn(&ex, async move {
1456 c.set(c.get() + 1);
1457 });
1458 }
1459 Executor::flush_instance(&ex);
1460 assert_eq!(counter.get(), 20);
1461 }
1462
1463 use crate::timer;
1466
1467 #[test]
1468 fn timer_zero_duration_completes_immediately() {
1469 init();
1470 let done = Rc::new(Cell::new(false));
1471 let d = Rc::clone(&done);
1472 spawn_global(async move {
1473 timer::sleep(Duration::ZERO).await;
1474 d.set(true);
1475 });
1476 assert!(done.get());
1478 }
1479
1480 #[test]
1481 fn timer_normal_delay_fires_after_time_advances() {
1482 init();
1483 let ts = Rc::new(TestTimeSource::new(0));
1484 init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
1485
1486 let done = Rc::new(Cell::new(false));
1487 let d = Rc::clone(&done);
1488 spawn_global(async move {
1489 timer::sleep(Duration::from_millis(100)).await;
1490 d.set(true);
1491 });
1492 assert!(!done.get());
1494
1495 ts.advance(150);
1498 crate::executor::flush_all();
1499 assert!(done.get());
1500 }
1501
1502 #[test]
1503 fn timer_across_multiple_flushes() {
1504 init();
1505 let ts = Rc::new(TestTimeSource::new(0));
1506 init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
1507
1508 let counter = Rc::new(Cell::new(0u32));
1509 let c = Rc::clone(&counter);
1510 spawn_global(async move {
1511 for _ in 0..3 {
1512 timer::sleep(Duration::from_millis(100)).await;
1513 c.set(c.get() + 1);
1514 }
1515 });
1516 assert_eq!(counter.get(), 0);
1517
1518 ts.advance(100);
1519 crate::executor::flush_all();
1520 assert_eq!(counter.get(), 1);
1521
1522 ts.advance(100);
1523 crate::executor::flush_all();
1524 assert_eq!(counter.get(), 2);
1525
1526 ts.advance(100);
1527 crate::executor::flush_all();
1528 assert_eq!(counter.get(), 3);
1529 }
1530
1531 #[test]
1532 fn timer_cancelled_by_scope_drop() {
1533 init();
1534 let executed = Rc::new(Cell::new(false));
1535 let ex = Rc::clone(&executed);
1536 {
1537 let scope = TaskScope::new();
1538 scope.spawn(async move {
1539 timer::sleep(Duration::from_millis(500)).await;
1540 ex.set(true);
1541 });
1542 }
1543 assert!(!executed.get());
1546 assert_eq!(executor::debug_task_count(), 0);
1547 }
1548
1549 #[test]
1550 fn reentrant_flush_is_noop() {
1551 init();
1552 let reentered = Rc::new(Cell::new(false));
1559 let r = Rc::clone(&reentered);
1560 let sig = Signal::new(0);
1561 auralis_signal::subscribe(&sig, Rc::new(move || r.set(true)));
1562 sig.set(1);
1566 assert!(reentered.get());
1567 }
1568
1569 #[test]
1570 fn instance_executor_timer() {
1571 init();
1572 let ex = Executor::new_instance();
1573 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1574 let ts = Rc::new(TestTimeSource::new(0));
1575 Executor::install_time_source(&ex, Rc::clone(&ts) as Rc<dyn TimeSource>);
1576
1577 let done = Rc::new(Cell::new(false));
1578 let d = Rc::clone(&done);
1579 Executor::spawn(&ex, async move {
1580 timer::sleep(Duration::from_millis(50)).await;
1581 d.set(true);
1582 });
1583 assert!(!done.get());
1584
1585 ts.advance(60);
1587 Executor::flush_instance(&ex);
1588 assert!(done.get());
1589 }
1590
1591 #[test]
1592 fn set_deferred_routes_to_instance_executor() {
1593 init();
1594 let ex = Executor::new_instance();
1595 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1596
1597 let sig = Signal::new(0);
1598 let s = sig.clone();
1599
1600 Executor::spawn(&ex, async move {
1602 crate::set_deferred(&s, 42);
1603 });
1604
1605 Executor::flush_instance(&ex);
1607 assert_eq!(sig.read(), 42);
1609 }
1610
1611 #[test]
1614 fn panic_hook_is_invoked_on_task_panic() {
1615 init();
1616 let hook_called = Rc::new(Cell::new(false));
1617 let hc = Rc::clone(&hook_called);
1618
1619 crate::set_panic_hook(Rc::new(move |_info| {
1620 hc.set(true);
1621 }));
1622
1623 let scope = TaskScope::new();
1624 scope.spawn(async move { panic!("intentional") });
1625
1626 assert!(hook_called.get());
1628 }
1629
1630 #[test]
1631 fn current_scope_available_in_spawned_task() {
1632 init();
1633 let scope = TaskScope::new();
1634 let found = Rc::new(Cell::new(false));
1635 let f = Rc::clone(&found);
1636 scope.spawn(async move {
1637 f.set(crate::current_scope().is_some());
1638 });
1639 assert!(found.get());
1640 }
1641
1642 #[test]
1643 fn callback_handle_noop_does_not_panic() {
1644 let _h = crate::CallbackHandle::noop();
1645 }
1647
1648 #[test]
1649 fn sync_callback_fallback_without_schedule_hook() {
1650 crate::reset_executor_for_test();
1653 let sig = Signal::new(0);
1656 let called = Rc::new(Cell::new(false));
1657 let c = Rc::clone(&called);
1658 auralis_signal::subscribe(&sig, Rc::new(move || c.set(true)));
1659
1660 sig.set(1);
1661 assert!(called.get());
1663 }
1664
1665 #[test]
1666 fn set_deferred_isolated_to_instance_executor() {
1667 init();
1668 let ex1 = Executor::new_instance();
1669 Executor::install_flush_scheduler(&ex1, Rc::new(TestScheduleFlush));
1670 let ex2 = Executor::new_instance();
1671 Executor::install_flush_scheduler(&ex2, Rc::new(TestScheduleFlush));
1672
1673 let sig1 = Signal::new(0);
1674 let sig2 = Signal::new(0);
1675 let s1 = sig1.clone();
1676
1677 crate::with_executor(&ex1, || {
1679 crate::set_deferred(&s1, 42);
1680 });
1681 Executor::flush_instance(&ex1);
1682 assert_eq!(sig1.read(), 42);
1683 assert_eq!(sig2.read(), 0);
1685 }
1686
1687 #[test]
1688 fn notify_signal_state_follow_up_handles_reentrant_dirty() {
1689 let sig = Signal::new(0);
1692 let sig2 = sig.clone();
1693 let count = Rc::new(Cell::new(0u32));
1694 let c = Rc::clone(&count);
1695
1696 auralis_signal::subscribe(
1697 &sig,
1698 Rc::new(move || {
1699 c.set(c.get() + 1);
1700 if c.get() == 1 {
1702 sig2.set(2);
1703 }
1704 }),
1705 );
1706
1707 sig.set(1);
1708 assert_eq!(sig.read(), 2);
1711 assert_eq!(count.get(), 2);
1712 }
1713}