1use std::any::{Any, TypeId};
6use std::cell::{Cell, RefCell};
7use std::collections::{HashMap, VecDeque};
8use std::future::Future;
9use std::pin::Pin;
10use std::rc::{Rc, Weak};
11
12use crate::executor;
13use crate::Priority;
14
15type ScopeId = u64;
16type TaskId = u64;
17
18thread_local! {
23 static NEXT_SCOPE_ID: Cell<ScopeId> = const { Cell::new(1) };
24}
25
26fn alloc_scope_id() -> ScopeId {
27 NEXT_SCOPE_ID.with(|c| {
28 let id = c.get();
29 c.set(id + 1);
30 id
31 })
32}
33
34pub struct CallbackHandle {
46 cleanup: Option<Box<dyn FnOnce() + 'static>>,
47}
48
49impl CallbackHandle {
50 pub fn new(cleanup: impl FnOnce() + 'static) -> Self {
52 Self {
53 cleanup: Some(Box::new(cleanup)),
54 }
55 }
56
57 #[must_use]
62 pub fn noop() -> Self {
63 Self { cleanup: None }
64 }
65}
66
67impl Drop for CallbackHandle {
68 fn drop(&mut self) {
69 if let Some(f) = self.cleanup.take() {
70 f();
71 }
72 }
73}
74
75type ScopeRegistryEntry = (Weak<RefCell<TaskScopeInner>>, Weak<Cell<bool>>);
96
97thread_local! {
98 static SCOPE_REGISTRY: RefCell<HashMap<ScopeId, ScopeRegistryEntry>> =
99 RefCell::new(HashMap::new());
100}
101
102fn register_scope(id: ScopeId, inner: &Rc<RefCell<TaskScopeInner>>, suspended: &Rc<Cell<bool>>) {
105 let _ = SCOPE_REGISTRY.try_with(|reg| {
106 if let Ok(mut r) = reg.try_borrow_mut() {
107 r.insert(id, (Rc::downgrade(inner), Rc::downgrade(suspended)));
108 }
109 });
110}
111
112fn unregister_scope(id: ScopeId) {
113 let _ = SCOPE_REGISTRY.try_with(|reg| {
114 if let Ok(mut r) = reg.try_borrow_mut() {
115 r.remove(&id);
116 }
117 });
118}
119
120#[must_use]
124pub fn find_scope(scope_id: ScopeId) -> Option<TaskScope> {
125 SCOPE_REGISTRY
126 .try_with(|reg| {
127 if let Ok(r) = reg.try_borrow() {
128 r.get(&scope_id).and_then(|(inner_weak, suspended_weak)| {
129 let inner = inner_weak.upgrade()?;
130 let suspended = suspended_weak.upgrade()?;
131 Some(TaskScope { inner, suspended })
132 })
133 } else {
134 None
135 }
136 })
137 .ok()
138 .flatten()
139}
140
141#[cfg(feature = "debug")]
145#[doc(hidden)]
146#[must_use]
147pub fn scope_debug_label(scope_id: ScopeId) -> Option<String> {
148 find_scope(scope_id).and_then(|s| s.inner.borrow().debug_label.clone())
149}
150
151#[doc(hidden)]
153pub fn clear_scope_registry() {
154 let _ = SCOPE_REGISTRY.try_with(|reg| {
155 if let Ok(mut r) = reg.try_borrow_mut() {
156 r.clear();
157 }
158 });
159}
160
161type ScopeSetFn = fn(Option<TaskScope>);
171type ScopeGetFn = fn() -> Option<TaskScope>;
172
173pub struct ScopeStore {
180 pub set_fn: ScopeSetFn,
182 pub get_fn: ScopeGetFn,
184}
185
186use std::sync::OnceLock;
187static SCOPE_STORE: OnceLock<ScopeStore> = OnceLock::new();
188
189fn ensure_default_store() -> &'static ScopeStore {
190 SCOPE_STORE.get_or_init(|| ScopeStore {
191 set_fn: thread_local_set,
192 get_fn: thread_local_get,
193 })
194}
195
196pub fn set_scope_store(store: ScopeStore) {
212 let _ = SCOPE_STORE.set(store);
213}
214
215thread_local! {
242 static CURRENT_SCOPE: RefCell<Option<TaskScope>> = const { RefCell::new(None) };
243}
244
245fn thread_local_set(scope: Option<TaskScope>) {
246 CURRENT_SCOPE.with(|cell| {
247 cell.replace(scope);
248 });
249}
250
251fn thread_local_get() -> Option<TaskScope> {
252 CURRENT_SCOPE.with(|cell| cell.borrow().clone())
253}
254
255pub(crate) fn set_scope_direct(scope: Option<TaskScope>) {
260 let store = ensure_default_store();
261 (store.set_fn)(scope);
262}
263
264pub(crate) fn get_scope_direct() -> Option<TaskScope> {
266 let store = ensure_default_store();
267 (store.get_fn)()
268}
269
270#[cfg(feature = "ssr-tokio")]
286pub fn init_scope_store_tokio() {
287 tokio::task_local! {
288 static TK_SCOPE: std::cell::RefCell<Option<TaskScope>>;
289 }
290
291 let _ = TK_SCOPE.try_with(|cell| {
293 cell.replace(None);
294 });
295
296 set_scope_store(ScopeStore {
297 set_fn: |s| {
298 let _ = TK_SCOPE.try_with(|cell| {
299 cell.replace(s);
300 });
301 },
302 get_fn: || {
303 TK_SCOPE
304 .try_with(|cell| cell.borrow().clone())
305 .ok()
306 .flatten()
307 },
308 });
309}
310
311pub fn with_current_scope<R>(scope: &TaskScope, f: impl FnOnce() -> R) -> R {
321 let store = ensure_default_store();
322 let prev = (store.get_fn)();
323 (store.set_fn)(Some(scope.clone_inner()));
324 let result = f();
325 (store.set_fn)(prev);
326 result
327}
328
329#[must_use]
331pub fn current_scope() -> Option<TaskScope> {
332 let store = ensure_default_store();
333 (store.get_fn)()
334}
335
336struct TaskScopeInner {
341 id: ScopeId,
342 task_ids: Vec<TaskId>,
343 children: Vec<TaskScope>,
344 parent: Option<Weak<RefCell<TaskScopeInner>>>,
346 context: RefCell<HashMap<TypeId, Rc<dyn Any>>>,
348 callbacks: RefCell<Vec<CallbackHandle>>,
350 #[cfg(feature = "debug")]
352 debug_label: Option<String>,
353 cancelled: bool,
354 executor: executor::ExecutorRef,
359}
360
361#[must_use]
387pub struct TaskScope {
388 inner: Rc<RefCell<TaskScopeInner>>,
389 suspended: Rc<Cell<bool>>,
394}
395
396impl TaskScope {
397 pub fn new() -> Self {
401 Self::with_executor(&executor::current_executor_instance())
402 }
403
404 pub fn with_executor(ex: &executor::ExecutorRef) -> Self {
410 let inner = Rc::new(RefCell::new(TaskScopeInner {
411 id: alloc_scope_id(),
412 task_ids: Vec::new(),
413 children: Vec::new(),
414 parent: None,
415 context: RefCell::new(HashMap::new()),
416 callbacks: RefCell::new(Vec::new()),
417 #[cfg(feature = "debug")]
418 debug_label: None,
419 cancelled: false,
420 executor: Rc::clone(ex),
421 }));
422 let id = inner.borrow().id;
423 let suspended = Rc::new(Cell::new(false));
424 register_scope(id, &inner, &suspended);
425 Self { inner, suspended }
426 }
427
428 pub fn new_child(parent: &Self) -> Self {
430 let ex = parent.inner.borrow().executor.clone();
431 let inner = Rc::new(RefCell::new(TaskScopeInner {
432 id: alloc_scope_id(),
433 task_ids: Vec::new(),
434 children: Vec::new(),
435 parent: Some(Rc::downgrade(&parent.inner)),
436 context: RefCell::new(HashMap::new()),
437 callbacks: RefCell::new(Vec::new()),
438 #[cfg(feature = "debug")]
439 debug_label: None,
440 cancelled: false,
441 executor: ex,
442 }));
443 let id = inner.borrow().id;
444 let suspended = Rc::new(Cell::new(false));
445 register_scope(id, &inner, &suspended);
446 let child = Self { inner, suspended };
447 parent.inner.borrow_mut().children.push(child.clone_inner());
448 child
449 }
450
451 pub fn spawn(&self, future: impl Future<Output = ()> + 'static) {
453 self.spawn_with_priority(Priority::Low, future);
454 }
455
456 pub fn spawn_with_priority(
462 &self,
463 priority: Priority,
464 future: impl Future<Output = ()> + 'static,
465 ) {
466 let inner = self.inner.borrow();
467 if inner.cancelled {
468 return;
469 }
470 let ex = Rc::clone(&inner.executor);
471 let task_id = executor::with_executor(&ex, || {
472 with_current_scope(self, || {
473 executor::spawn_scoped_on(&ex, priority, inner.id, future)
474 })
475 });
476 drop(inner);
477 self.inner.borrow_mut().task_ids.push(task_id);
478 }
479
480 pub fn register_callback_handle(&self, handle: CallbackHandle) {
488 let inner = self.inner.borrow();
489 if inner.cancelled {
490 return;
491 }
492 inner.callbacks.borrow_mut().push(handle);
493 }
494
495 pub fn provide<T: 'static>(&self, value: T) {
503 self.inner
504 .borrow()
505 .context
506 .borrow_mut()
507 .insert(TypeId::of::<T>(), Rc::new(value));
508 }
509
510 #[must_use]
515 pub fn consume<T: 'static>(&self) -> Option<Rc<T>> {
516 let mut current = Some(Rc::clone(&self.inner));
517
518 while let Some(inner) = current {
519 {
521 let inner_ref = inner.borrow();
522 let ctx = inner_ref.context.borrow();
523 if let Some(val) = ctx.get(&TypeId::of::<T>()) {
524 if let Ok(downcast) = val.clone().downcast::<T>() {
525 return Some(downcast);
526 }
527 }
528 }
529
530 let parent = {
532 let inner_ref = inner.borrow();
533 inner_ref.parent.as_ref().and_then(Weak::upgrade)
534 };
535 current = parent;
536 }
537
538 None
539 }
540
541 #[must_use]
548 #[track_caller]
549 pub fn expect_context<T: 'static>(&self) -> Rc<T> {
550 self.consume::<T>()
551 .unwrap_or_else(|| panic!("context not found: {}", std::any::type_name::<T>()))
552 }
553
554 #[must_use]
558 pub fn is_cancelled(&self) -> bool {
559 self.inner.borrow().cancelled
560 }
561
562 #[cfg(feature = "debug")]
568 pub fn set_debug_label(&self, label: impl Into<String>) {
569 self.inner.borrow_mut().debug_label = Some(label.into());
570 }
571
572 #[cfg(test)]
576 #[must_use]
577 pub fn task_count(&self) -> usize {
578 self.inner.borrow().task_ids.len()
579 }
580
581 #[cfg(test)]
583 #[must_use]
584 pub fn child_count(&self) -> usize {
585 self.inner.borrow().children.len()
586 }
587
588 fn clone_inner(&self) -> Self {
591 Self {
592 inner: Rc::clone(&self.inner),
593 suspended: Rc::clone(&self.suspended),
594 }
595 }
596
597 pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
602 with_current_scope(self, f)
603 }
604
605 pub fn suspend(&self) {
615 if self.suspended.get() {
616 return;
617 }
618 self.suspended.set(true);
619 let children: Vec<TaskScope> = {
621 self.inner
622 .borrow()
623 .children
624 .iter()
625 .map(TaskScope::clone_inner)
626 .collect()
627 };
628 for child in &children {
629 child.suspend();
630 }
631 }
632
633 pub fn resume(&self) {
638 if !self.suspended.get() {
639 return;
640 }
641 self.suspended.set(false);
642
643 let (scope_id, children) = {
644 let inner = self.inner.borrow();
645 let id = inner.id;
646 let children: Vec<TaskScope> =
647 inner.children.iter().map(TaskScope::clone_inner).collect();
648 (id, children)
649 };
650
651 let ex = Rc::clone(&self.inner.borrow().executor);
653 executor::enqueue_scope_tasks_on(&ex, scope_id);
654
655 for child in &children {
657 child.resume();
658 }
659 }
660
661 #[must_use]
663 pub fn is_suspended(&self) -> bool {
664 self.suspended.get()
665 }
666}
667
668impl Default for TaskScope {
669 fn default() -> Self {
670 Self::new()
671 }
672}
673
674impl Clone for TaskScope {
675 fn clone(&self) -> Self {
676 self.clone_inner()
677 }
678}
679
680impl Drop for TaskScope {
687 fn drop(&mut self) {
688 if Rc::strong_count(&self.inner) > 1 {
693 return;
694 }
695
696 let Ok(mut inner) = self.inner.try_borrow_mut() else {
697 #[cfg(debug_assertions)]
701 {
702 eprintln!(
703 "[auralis-task] WARNING: TaskScope::drop cannot borrow inner \
704 (already borrowed). If this was the last clone, tasks and \
705 callbacks will leak. Avoid dropping the last TaskScope clone \
706 inside a callback or during executor flush."
707 );
708 }
709 return;
710 };
711 if inner.cancelled {
712 return;
713 }
714 inner.cancelled = true;
715
716 inner.callbacks.borrow_mut().clear();
718
719 let mut descendants: Vec<Rc<RefCell<TaskScopeInner>>> = Vec::new();
721 {
722 let mut queue: VecDeque<Rc<RefCell<TaskScopeInner>>> = VecDeque::new();
723 for child in &inner.children {
724 queue.push_back(Rc::clone(&child.inner));
725 }
726
727 while let Some(scope_rc) = queue.pop_front() {
728 let scope = scope_rc.borrow();
729 for child in &scope.children {
730 queue.push_back(Rc::clone(&child.inner));
731 }
732 descendants.push(Rc::clone(&scope_rc));
733 }
734 }
735
736 for scope_rc in descendants.iter().rev() {
738 let mut scope = scope_rc.borrow_mut();
739 if scope.cancelled {
740 continue;
741 }
742 scope.cancelled = true;
743
744 scope.callbacks.borrow_mut().clear();
746
747 if !scope.task_ids.is_empty() {
748 let ex = Rc::clone(&scope.executor);
749 let dropped_futures: Vec<Pin<Box<dyn Future<Output = ()>>>> =
750 executor::cancel_scope_tasks_on(&ex, scope.id);
751 drop(dropped_futures);
752 }
753 scope.context.borrow_mut().clear();
754 unregister_scope(scope.id);
755 }
756
757 if !inner.task_ids.is_empty() {
759 let ex = Rc::clone(&inner.executor);
760 let dropped_futures = executor::cancel_scope_tasks_on(&ex, inner.id);
761 drop(dropped_futures);
762 }
763
764 inner.context.borrow_mut().clear();
765 inner.children.clear();
766
767 unregister_scope(inner.id);
769 }
770}
771
772#[macro_export]
782macro_rules! provide_context {
783 ($scope:expr, $value:expr) => {
784 $scope.provide($value)
785 };
786}
787
788#[macro_export]
794macro_rules! consume_context {
795 ($scope:expr, $ty:ty) => {
796 $scope.consume::<$ty>()
797 };
798}
799
800#[cfg(test)]
805#[allow(clippy::items_after_statements)]
806mod tests {
807 use super::*;
808 use crate::executor::{self, init_flush_scheduler, reset_executor_for_test, TestScheduleFlush};
809 use crate::{init_time_source, ScheduleFlush, TestTimeSource, TimeSource};
810 use auralis_signal::Signal;
811 use std::cell::{Cell, RefCell};
812 use std::rc::Rc;
813 use std::time::Duration;
814
815 fn init() {
816 reset_executor_for_test();
817 init_flush_scheduler(Rc::new(TestScheduleFlush));
818 }
819
820 #[test]
823 fn new_scope_has_zero_tasks() {
824 let scope = TaskScope::new();
825 assert_eq!(scope.task_count(), 0);
826 assert_eq!(scope.child_count(), 0);
827 }
828
829 #[test]
830 fn new_child_attaches_to_parent() {
831 let parent = TaskScope::new();
832 let _child = TaskScope::new_child(&parent);
833 assert_eq!(parent.child_count(), 1);
834 }
835
836 #[test]
837 fn spawn_adds_task() {
838 init();
839 let scope = TaskScope::new();
840 scope.spawn(async {});
841 assert_eq!(scope.task_count(), 1);
842 }
843
844 #[test]
845 fn spawn_and_complete() {
846 init();
847 let done = Rc::new(Cell::new(false));
848 let done2 = Rc::clone(&done);
849 spawn_global(async move {
850 done2.set(true);
851 });
852 assert!(done.get());
853 }
854
855 #[test]
856 fn scope_spawn_and_cancel() {
857 init();
858 let dropped = Rc::new(Cell::new(false));
859 {
860 let scope = TaskScope::new();
861 let d = Rc::clone(&dropped);
862 struct DropCheck(Rc<Cell<bool>>);
863 impl Drop for DropCheck {
864 fn drop(&mut self) {
865 self.0.set(true);
866 }
867 }
868 scope.spawn(async move {
869 let _guard = DropCheck(d);
870 std::future::pending::<()>().await;
871 });
872 assert_eq!(executor::debug_task_count(), 1);
873 }
874 assert!(dropped.get());
875 assert_eq!(executor::debug_task_count(), 0);
876 }
877
878 #[test]
879 fn nested_scope_child_cancel_with_parent() {
880 init();
881 let dropped_child = Rc::new(Cell::new(false));
882 {
883 let parent = TaskScope::new();
884 let child = TaskScope::new_child(&parent);
885 let d = Rc::clone(&dropped_child);
886 struct DropCheck(Rc<Cell<bool>>);
887 impl Drop for DropCheck {
888 fn drop(&mut self) {
889 self.0.set(true);
890 }
891 }
892 child.spawn(async move {
893 let _guard = DropCheck(d);
894 std::future::pending::<()>().await;
895 });
896 assert_eq!(executor::debug_task_count(), 1);
897 }
898 assert!(dropped_child.get());
899 assert_eq!(executor::debug_task_count(), 0);
900 }
901
902 #[test]
903 fn deeply_nested_scope_drop_no_stack_overflow() {
904 init();
905 let root = TaskScope::new();
906 {
907 let mut current = TaskScope::new_child(&root);
908 for _ in 0..199 {
909 current = TaskScope::new_child(¤t);
910 }
911 }
912 drop(root);
913 assert_eq!(executor::debug_task_count(), 0);
914 }
915
916 #[test]
917 fn scope_child_explicit_tree() {
918 let root = TaskScope::new();
919 let a = TaskScope::new_child(&root);
920 let b = TaskScope::new_child(&root);
921 let _a1 = TaskScope::new_child(&a);
922 let _a2 = TaskScope::new_child(&a);
923 assert_eq!(root.child_count(), 2);
924 assert_eq!(a.child_count(), 2);
925 assert_eq!(b.child_count(), 0);
926 }
927
928 #[test]
931 fn callback_handle_dropped_before_tasks() {
932 init();
933 let dropped_order: Rc<RefCell<Vec<String>>> = Rc::new(RefCell::new(Vec::new()));
934 {
935 let scope = TaskScope::new();
936 let order1 = Rc::clone(&dropped_order);
937 scope.register_callback_handle(CallbackHandle::new(move || {
938 order1.borrow_mut().push("callback".to_string());
939 }));
940 let order2 = Rc::clone(&dropped_order);
941 struct DropCheck {
942 order: Rc<RefCell<Vec<String>>>,
943 label: String,
944 }
945 impl Drop for DropCheck {
946 fn drop(&mut self) {
947 self.order.borrow_mut().push(self.label.clone());
948 }
949 }
950 scope.spawn(async move {
951 let _guard = DropCheck {
952 order: order2,
953 label: "task".to_string(),
954 };
955 std::future::pending::<()>().await;
956 });
957 }
958 let order = dropped_order.borrow().clone();
959 assert_eq!(order, vec!["callback", "task"]);
960 }
961
962 #[test]
963 fn callback_handle_cleaned_up_on_child_scope_drop() {
964 init();
965 let called = Rc::new(Cell::new(false));
966 {
967 let parent = TaskScope::new();
968 let child = TaskScope::new_child(&parent);
969 let c = Rc::clone(&called);
970 child.register_callback_handle(CallbackHandle::new(move || {
971 c.set(true);
972 }));
973 }
975 assert!(called.get());
976 }
977
978 #[test]
981 fn context_provide_and_consume_in_same_scope() {
982 let scope = TaskScope::new();
983 scope.provide(42i32);
984 assert_eq!(*scope.consume::<i32>().unwrap(), 42);
985 }
986
987 #[test]
988 fn context_consume_walks_up_to_parent() {
989 let parent = TaskScope::new();
990 parent.provide("hello".to_string());
991 let child = TaskScope::new_child(&parent);
992 assert_eq!(*child.consume::<String>().unwrap(), "hello");
993 }
994
995 #[test]
996 fn context_consume_not_found() {
997 let scope = TaskScope::new();
998 assert!(scope.consume::<i32>().is_none());
999 }
1000
1001 #[test]
1002 fn context_removed_on_scope_drop() {
1003 let parent = TaskScope::new();
1004 parent.provide(99u32);
1005 {
1006 let _child = TaskScope::new_child(&parent);
1007 }
1009 assert_eq!(*parent.consume::<u32>().unwrap(), 99);
1011 }
1012
1013 #[test]
1014 fn context_shadowing() {
1015 let parent = TaskScope::new();
1016 parent.provide(1i32);
1017 let child = TaskScope::new_child(&parent);
1018 child.provide(2i32);
1019 assert_eq!(*child.consume::<i32>().unwrap(), 2);
1021 assert_eq!(*parent.consume::<i32>().unwrap(), 1);
1023 }
1024
1025 #[test]
1026 #[should_panic(expected = "context not found")]
1027 fn expect_context_panics_when_missing() {
1028 let scope = TaskScope::new();
1029 let _ = scope.expect_context::<String>();
1030 }
1031
1032 #[test]
1035 fn executor_priority_ordering() {
1036 init();
1037 let order = Rc::new(RefCell::new(Vec::new()));
1038 let o1 = Rc::clone(&order);
1039 executor::spawn_no_auto_flush(Priority::Low, async move {
1040 o1.borrow_mut().push("low");
1041 });
1042 let o2 = Rc::clone(&order);
1043 executor::spawn_no_auto_flush(Priority::High, async move {
1044 o2.borrow_mut().push("high");
1045 });
1046 executor::flush_all();
1047 let result = order.borrow().clone();
1048 assert_eq!(result, vec!["high", "low"]);
1049 }
1050
1051 #[test]
1052 fn executor_batch() {
1053 init();
1054 let counter = Rc::new(Cell::new(0u32));
1055 for _ in 0..10 {
1056 let c = Rc::clone(&counter);
1057 spawn_global(async move {
1058 c.set(c.get() + 1);
1059 });
1060 }
1061 assert_eq!(counter.get(), 10);
1062 assert_eq!(executor::debug_task_count(), 0);
1063 }
1064
1065 #[test]
1066 fn no_leak_on_cancel() {
1067 init();
1068 for _ in 0..50 {
1069 let scope = TaskScope::new();
1070 for _ in 0..5 {
1071 scope.spawn(std::future::pending::<()>());
1072 }
1073 }
1074 assert_eq!(executor::debug_task_count(), 0);
1075 }
1076
1077 #[test]
1078 fn set_deferred_triggers_after_flush() {
1079 use auralis_signal::Signal;
1080 init();
1081 let sig = Signal::new(0);
1082 let observed = Rc::new(Cell::new(0));
1083 set_deferred(&sig, 42);
1084 assert_eq!(sig.read(), 42);
1085 let ob1 = Rc::clone(&observed);
1086 spawn_global(async move {
1087 ob1.set(sig.read());
1088 });
1089 assert_eq!(observed.get(), 42);
1090 }
1091
1092 #[test]
1093 fn set_deferred_in_drop_safe() {
1094 use auralis_signal::Signal;
1095 init();
1096 let sig = Signal::new(0);
1097 struct SetOnDrop {
1098 sig: Signal<i32>,
1099 val: i32,
1100 }
1101 impl Drop for SetOnDrop {
1102 fn drop(&mut self) {
1103 set_deferred(&self.sig, self.val);
1104 }
1105 }
1106 let guard = SetOnDrop {
1107 sig: sig.clone(),
1108 val: 99,
1109 };
1110 drop(guard);
1111 assert_eq!(sig.read(), 99);
1112 }
1113
1114 #[test]
1115 fn set_deferred_from_drop_guard_during_scope_cancel() {
1116 use auralis_signal::Signal;
1117 init();
1118
1119 let sig = Signal::new(0i32);
1120
1121 struct ResetOnDrop {
1125 sig: Signal<i32>,
1126 }
1127 impl Drop for ResetOnDrop {
1128 fn drop(&mut self) {
1129 set_deferred(&self.sig, 42);
1130 }
1131 }
1132
1133 {
1134 let scope = TaskScope::new();
1135 let s = sig.clone();
1136 scope.spawn(async move {
1137 let _guard = ResetOnDrop { sig: s };
1138 std::future::pending::<()>().await;
1141 });
1142 }
1144
1145 assert_eq!(
1147 sig.read(),
1148 42,
1149 "set_deferred should have fired after scope cancel"
1150 );
1151 }
1152
1153 #[test]
1154 fn yield_now_gives_other_tasks_a_turn() {
1155 init();
1156 let order = Rc::new(RefCell::new(Vec::new()));
1157 let o1 = Rc::clone(&order);
1158 executor::spawn_no_auto_flush(Priority::Low, async move {
1159 o1.borrow_mut().push("a1");
1160 executor::yield_now().await;
1161 o1.borrow_mut().push("a2");
1162 });
1163 let o2 = Rc::clone(&order);
1164 executor::spawn_no_auto_flush(Priority::Low, async move {
1165 o2.borrow_mut().push("b1");
1166 o2.borrow_mut().push("b2");
1167 });
1168 executor::flush_all();
1169 let r = order.borrow().clone();
1170 assert_eq!(&r[0..3], &["a1", "b1", "b2"][..]);
1171 assert!(r.contains(&"a2"));
1172 }
1173
1174 #[test]
1175 fn panic_in_task_is_isolated() {
1176 init();
1177 let survived = Rc::new(Cell::new(false));
1178 let s = Rc::clone(&survived);
1179 spawn_global(async move {
1180 panic!("intentional test panic");
1181 });
1182 spawn_global(async move {
1183 s.set(true);
1184 });
1185 assert!(survived.get());
1186 assert_eq!(executor::debug_task_count(), 0);
1187 }
1188
1189 #[test]
1192 fn time_budget_with_test_time_source() {
1193 init();
1194 let ts = Rc::new(TestTimeSource::new(0));
1195 init_time_source(ts.clone());
1196
1197 let polled = Rc::new(Cell::new(0u32));
1198
1199 for _ in 0..50 {
1202 let pc = Rc::clone(&polled);
1203 let ts_c = Rc::clone(&ts);
1204 executor::spawn_no_auto_flush(Priority::Low, async move {
1205 pc.set(pc.get() + 1);
1206 ts_c.advance(1);
1207 });
1208 }
1209
1210 executor::flush_all();
1214
1215 assert_eq!(polled.get(), 50);
1216 assert_eq!(executor::debug_task_count(), 0);
1217 }
1218
1219 #[test]
1220 fn time_budget_honoured_with_split() {
1221 let schedule_count = Rc::new(Cell::new(0u32));
1225 struct NoopScheduleFlush(Rc<Cell<u32>>);
1226 impl ScheduleFlush for NoopScheduleFlush {
1227 fn schedule(&self, _callback: Box<dyn FnOnce()>) {
1228 self.0.set(self.0.get() + 1);
1229 }
1232 }
1233 init_flush_scheduler(Rc::new(NoopScheduleFlush(Rc::clone(&schedule_count))));
1234
1235 let ts = Rc::new(TestTimeSource::new(0));
1236 init_time_source(ts.clone());
1237
1238 let polled = Rc::new(RefCell::new(Vec::new()));
1239
1240 for i in 0..50u32 {
1241 let pc = Rc::clone(&polled);
1242 let ts_c = Rc::clone(&ts);
1243 executor::spawn_no_auto_flush(Priority::Low, async move {
1244 pc.borrow_mut().push(i);
1245 ts_c.advance(1);
1246 });
1247 }
1248
1249 executor::flush_all();
1250
1251 let completed = polled.borrow().len();
1252 assert!(
1253 completed < 50,
1254 "budget should split before all tasks run (only {completed} of 50)"
1255 );
1256 assert!(
1257 completed >= 7,
1258 "at least 7 tasks should run before budget expires ({completed})"
1259 );
1260 assert_eq!(
1261 schedule_count.get(),
1262 1,
1263 "next flush should have been scheduled exactly once"
1264 );
1265
1266 init_flush_scheduler(Rc::new(TestScheduleFlush));
1269 executor::flush_all();
1270 assert_eq!(executor::debug_task_count(), 0);
1271 }
1272
1273 #[test]
1276 fn provide_context_macro_works() {
1277 let scope = TaskScope::new();
1278 provide_context!(scope, 42i32);
1279 assert_eq!(*scope.consume::<i32>().unwrap(), 42);
1280 }
1281
1282 #[test]
1283 fn consume_context_macro_works() {
1284 let scope = TaskScope::new();
1285 scope.provide(99u32);
1286 let val: Option<Rc<u32>> = consume_context!(scope, u32);
1287 assert_eq!(*val.unwrap(), 99);
1288 }
1289
1290 #[test]
1291 fn consume_context_macro_not_found() {
1292 let scope = TaskScope::new();
1293 let val: Option<Rc<String>> = consume_context!(scope, String);
1294 assert!(val.is_none());
1295 }
1296
1297 #[cfg(feature = "debug")]
1300 #[test]
1301 fn dump_task_tree_returns_string() {
1302 init();
1303 let scope = TaskScope::new();
1304 scope.spawn(async { std::future::pending::<()>().await });
1305
1306 let output = crate::dump_task_tree();
1307 assert!(output.contains("Auralis Task Tree"));
1308 assert!(output.contains("Total active tasks: 1"));
1309 assert!(output.contains("Scope"));
1310 }
1311
1312 #[cfg(feature = "debug")]
1313 #[test]
1314 fn dump_task_tree_empty() {
1315 init();
1316 let output = crate::dump_task_tree();
1317 assert!(output.contains("(no active tasks)"));
1318 }
1319
1320 use crate::{set_deferred, spawn_global};
1321
1322 #[test]
1325 fn suspend_prevents_task_execution() {
1326 init();
1327 let scope = TaskScope::new();
1328 let executed = Rc::new(Cell::new(false));
1329 let ex = Rc::clone(&executed);
1330 scope.spawn(async move {
1331 ex.set(true);
1332 });
1333 assert!(executed.get());
1335 executed.set(false);
1336
1337 scope.suspend();
1338 let ex2 = Rc::clone(&executed);
1339 scope.spawn(async move {
1340 ex2.set(true);
1341 });
1342 assert!(!executed.get());
1344 }
1345
1346 #[test]
1347 fn resume_allows_task_execution() {
1348 init();
1349 let scope = TaskScope::new();
1350 scope.suspend();
1351 let executed = Rc::new(Cell::new(false));
1352 let ex = Rc::clone(&executed);
1353 scope.spawn(async move {
1354 ex.set(true);
1355 });
1356 assert!(!executed.get());
1357
1358 scope.resume();
1359 assert!(executed.get());
1361 }
1362
1363 #[test]
1364 fn suspend_cascades_to_children() {
1365 init();
1366 let parent = TaskScope::new();
1367 let child = TaskScope::new_child(&parent);
1368 assert!(!child.is_suspended());
1369
1370 parent.suspend();
1371 assert!(parent.is_suspended());
1372 assert!(child.is_suspended());
1373 }
1374
1375 #[test]
1376 fn resume_cascades_to_children() {
1377 init();
1378 let parent = TaskScope::new();
1379 let child = TaskScope::new_child(&parent);
1380 parent.suspend();
1381 assert!(child.is_suspended());
1382
1383 parent.resume();
1384 assert!(!parent.is_suspended());
1385 assert!(!child.is_suspended());
1386 }
1387
1388 #[test]
1389 fn multiple_suspend_resume_no_leak() {
1390 init();
1391 let scope = TaskScope::new();
1392 for _ in 0..50 {
1393 scope.suspend();
1394 assert!(scope.is_suspended());
1395 scope.resume();
1396 assert!(!scope.is_suspended());
1397 }
1398 }
1400
1401 #[test]
1402 fn suspended_scope_drops_without_panic() {
1403 init();
1404 {
1405 let scope = TaskScope::new();
1406 scope.suspend();
1407 let d = Rc::new(Cell::new(false));
1408 struct DropCheck(Rc<Cell<bool>>);
1409 impl Drop for DropCheck {
1410 fn drop(&mut self) {
1411 self.0.set(true);
1412 }
1413 }
1414 scope.spawn(async move {
1415 let _guard = DropCheck(d);
1416 std::future::pending::<()>().await;
1417 });
1418 }
1421 assert_eq!(executor::debug_task_count(), 0);
1423 }
1424
1425 #[test]
1426 fn siblings_not_affected_by_suspend() {
1427 init();
1428 let parent = TaskScope::new();
1429 let child_a = TaskScope::new_child(&parent);
1430 let child_b = TaskScope::new_child(&parent);
1431
1432 child_a.suspend();
1433 assert!(child_a.is_suspended());
1434 assert!(!child_b.is_suspended());
1435 assert!(!parent.is_suspended());
1436 }
1437
1438 use crate::Executor;
1441
1442 #[test]
1443 fn flush_instance_panicking_task_is_isolated() {
1444 init();
1445 let ex = Executor::new_instance();
1446 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1447
1448 let survived = Rc::new(Cell::new(false));
1449 let s = Rc::clone(&survived);
1450
1451 Executor::spawn(&ex, async move {
1452 panic!("intentional test panic in instance executor");
1453 });
1454 Executor::spawn(&ex, async move {
1455 s.set(true);
1456 });
1457 Executor::flush_instance(&ex);
1458
1459 assert!(survived.get());
1460 }
1461
1462 #[test]
1463 fn flush_instance_spawn_and_complete() {
1464 init();
1465 let ex = Executor::new_instance();
1466 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1467
1468 let counter = Rc::new(Cell::new(0u32));
1469 for _ in 0..20 {
1470 let c = Rc::clone(&counter);
1471 Executor::spawn(&ex, async move {
1472 c.set(c.get() + 1);
1473 });
1474 }
1475 Executor::flush_instance(&ex);
1476 assert_eq!(counter.get(), 20);
1477 }
1478
1479 use crate::timer;
1482
1483 #[test]
1484 fn timer_zero_duration_completes_immediately() {
1485 init();
1486 let done = Rc::new(Cell::new(false));
1487 let d = Rc::clone(&done);
1488 spawn_global(async move {
1489 timer::sleep(Duration::ZERO).await;
1490 d.set(true);
1491 });
1492 assert!(done.get());
1494 }
1495
1496 #[test]
1497 fn timer_normal_delay_fires_after_time_advances() {
1498 init();
1499 let ts = Rc::new(TestTimeSource::new(0));
1500 init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
1501
1502 let done = Rc::new(Cell::new(false));
1503 let d = Rc::clone(&done);
1504 spawn_global(async move {
1505 timer::sleep(Duration::from_millis(100)).await;
1506 d.set(true);
1507 });
1508 assert!(!done.get());
1510
1511 ts.advance(150);
1514 crate::executor::flush_all();
1515 assert!(done.get());
1516 }
1517
1518 #[test]
1519 fn timer_across_multiple_flushes() {
1520 init();
1521 let ts = Rc::new(TestTimeSource::new(0));
1522 init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
1523
1524 let counter = Rc::new(Cell::new(0u32));
1525 let c = Rc::clone(&counter);
1526 spawn_global(async move {
1527 for _ in 0..3 {
1528 timer::sleep(Duration::from_millis(100)).await;
1529 c.set(c.get() + 1);
1530 }
1531 });
1532 assert_eq!(counter.get(), 0);
1533
1534 ts.advance(100);
1535 crate::executor::flush_all();
1536 assert_eq!(counter.get(), 1);
1537
1538 ts.advance(100);
1539 crate::executor::flush_all();
1540 assert_eq!(counter.get(), 2);
1541
1542 ts.advance(100);
1543 crate::executor::flush_all();
1544 assert_eq!(counter.get(), 3);
1545 }
1546
1547 #[test]
1548 fn timer_cancelled_by_scope_drop() {
1549 init();
1550 let executed = Rc::new(Cell::new(false));
1551 let ex = Rc::clone(&executed);
1552 {
1553 let scope = TaskScope::new();
1554 scope.spawn(async move {
1555 timer::sleep(Duration::from_millis(500)).await;
1556 ex.set(true);
1557 });
1558 }
1559 assert!(!executed.get());
1562 assert_eq!(executor::debug_task_count(), 0);
1563 }
1564
1565 #[test]
1566 fn reentrant_flush_is_noop() {
1567 init();
1568 let reentered = Rc::new(Cell::new(false));
1575 let r = Rc::clone(&reentered);
1576 let sig = Signal::new(0);
1577 auralis_signal::subscribe(&sig, Rc::new(move || r.set(true)));
1578 sig.set(1);
1582 assert!(reentered.get());
1583 }
1584
1585 #[test]
1586 fn instance_executor_timer() {
1587 init();
1588 let ex = Executor::new_instance();
1589 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1590 let ts = Rc::new(TestTimeSource::new(0));
1591 Executor::install_time_source(&ex, Rc::clone(&ts) as Rc<dyn TimeSource>);
1592
1593 let done = Rc::new(Cell::new(false));
1594 let d = Rc::clone(&done);
1595 Executor::spawn(&ex, async move {
1596 timer::sleep(Duration::from_millis(50)).await;
1597 d.set(true);
1598 });
1599 assert!(!done.get());
1600
1601 ts.advance(60);
1603 Executor::flush_instance(&ex);
1604 assert!(done.get());
1605 }
1606
1607 #[test]
1608 fn set_deferred_routes_to_instance_executor() {
1609 init();
1610 let ex = Executor::new_instance();
1611 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1612
1613 let sig = Signal::new(0);
1614 let s = sig.clone();
1615
1616 Executor::spawn(&ex, async move {
1618 crate::set_deferred(&s, 42);
1619 });
1620
1621 Executor::flush_instance(&ex);
1623 assert_eq!(sig.read(), 42);
1625 }
1626
1627 #[test]
1630 fn panic_hook_is_invoked_on_task_panic() {
1631 init();
1632 let hook_called = Rc::new(Cell::new(false));
1633 let hc = Rc::clone(&hook_called);
1634
1635 crate::set_panic_hook(Rc::new(move |_info| {
1636 hc.set(true);
1637 }));
1638
1639 let scope = TaskScope::new();
1640 scope.spawn(async move { panic!("intentional") });
1641
1642 assert!(hook_called.get());
1644 }
1645
1646 #[test]
1647 fn current_scope_available_in_spawned_task() {
1648 init();
1649 let scope = TaskScope::new();
1650 let found = Rc::new(Cell::new(false));
1651 let f = Rc::clone(&found);
1652 scope.spawn(async move {
1653 f.set(crate::current_scope().is_some());
1654 });
1655 assert!(found.get());
1656 }
1657
1658 #[test]
1659 fn callback_handle_noop_does_not_panic() {
1660 let _h = crate::CallbackHandle::noop();
1661 }
1663
1664 #[test]
1665 fn sync_callback_fallback_without_schedule_hook() {
1666 crate::reset_executor_for_test();
1669 let sig = Signal::new(0);
1672 let called = Rc::new(Cell::new(false));
1673 let c = Rc::clone(&called);
1674 auralis_signal::subscribe(&sig, Rc::new(move || c.set(true)));
1675
1676 sig.set(1);
1677 assert!(called.get());
1679 }
1680
1681 #[test]
1682 fn set_deferred_isolated_to_instance_executor() {
1683 init();
1684 let ex1 = Executor::new_instance();
1685 Executor::install_flush_scheduler(&ex1, Rc::new(TestScheduleFlush));
1686 let ex2 = Executor::new_instance();
1687 Executor::install_flush_scheduler(&ex2, Rc::new(TestScheduleFlush));
1688
1689 let sig1 = Signal::new(0);
1690 let sig2 = Signal::new(0);
1691 let s1 = sig1.clone();
1692
1693 crate::with_executor(&ex1, || {
1695 crate::set_deferred(&s1, 42);
1696 });
1697 Executor::flush_instance(&ex1);
1698 assert_eq!(sig1.read(), 42);
1699 assert_eq!(sig2.read(), 0);
1701 }
1702
1703 #[test]
1704 fn notify_signal_state_follow_up_handles_reentrant_dirty() {
1705 let sig = Signal::new(0);
1708 let sig2 = sig.clone();
1709 let count = Rc::new(Cell::new(0u32));
1710 let c = Rc::clone(&count);
1711
1712 auralis_signal::subscribe(
1713 &sig,
1714 Rc::new(move || {
1715 c.set(c.get() + 1);
1716 if c.get() == 1 {
1718 sig2.set(2);
1719 }
1720 }),
1721 );
1722
1723 sig.set(1);
1724 assert_eq!(sig.read(), 2);
1727 assert_eq!(count.get(), 2);
1728 }
1729}