1use std::any::{Any, TypeId};
6use std::cell::{Cell, RefCell};
7use std::collections::{HashMap, VecDeque};
8use std::future::Future;
9use std::pin::Pin;
10use std::rc::{Rc, Weak};
11
12use crate::executor;
13use crate::Priority;
14
15type ScopeId = u64;
16type TaskId = u64;
17
18thread_local! {
23 static NEXT_SCOPE_ID: Cell<ScopeId> = const { Cell::new(1) };
24}
25
26fn alloc_scope_id() -> ScopeId {
27 NEXT_SCOPE_ID.with(|c| {
28 let id = c.get();
29 c.set(id + 1);
30 id
31 })
32}
33
34pub struct CallbackHandle {
46 cleanup: Option<Box<dyn FnOnce() + 'static>>,
47}
48
49impl CallbackHandle {
50 pub fn new(cleanup: impl FnOnce() + 'static) -> Self {
52 Self {
53 cleanup: Some(Box::new(cleanup)),
54 }
55 }
56
57 #[must_use]
62 pub fn noop() -> Self {
63 Self { cleanup: None }
64 }
65}
66
67impl Drop for CallbackHandle {
68 fn drop(&mut self) {
69 if let Some(f) = self.cleanup.take() {
70 f();
71 }
72 }
73}
74
75type ScopeRegistryEntry = (Weak<RefCell<TaskScopeInner>>, Weak<Cell<bool>>);
96
97thread_local! {
98 static SCOPE_REGISTRY: RefCell<HashMap<ScopeId, ScopeRegistryEntry>> =
99 RefCell::new(HashMap::new());
100}
101
102fn register_scope(id: ScopeId, inner: &Rc<RefCell<TaskScopeInner>>, suspended: &Rc<Cell<bool>>) {
105 let _ = SCOPE_REGISTRY.try_with(|reg| {
106 if let Ok(mut r) = reg.try_borrow_mut() {
107 r.insert(id, (Rc::downgrade(inner), Rc::downgrade(suspended)));
108 }
109 });
110}
111
112fn unregister_scope(id: ScopeId) {
113 let _ = SCOPE_REGISTRY.try_with(|reg| {
114 if let Ok(mut r) = reg.try_borrow_mut() {
115 r.remove(&id);
116 }
117 });
118}
119
120#[must_use]
124pub fn find_scope(scope_id: ScopeId) -> Option<TaskScope> {
125 SCOPE_REGISTRY
126 .try_with(|reg| {
127 if let Ok(r) = reg.try_borrow() {
128 r.get(&scope_id).and_then(|(inner_weak, suspended_weak)| {
129 let inner = inner_weak.upgrade()?;
130 let suspended = suspended_weak.upgrade()?;
131 Some(TaskScope { inner, suspended })
132 })
133 } else {
134 None
135 }
136 })
137 .ok()
138 .flatten()
139}
140
141#[cfg(feature = "debug")]
145#[doc(hidden)]
146#[must_use]
147pub fn scope_debug_label(scope_id: ScopeId) -> Option<String> {
148 find_scope(scope_id).and_then(|s| s.inner.borrow().debug_label.clone())
149}
150
151#[doc(hidden)]
153pub fn clear_scope_registry() {
154 let _ = SCOPE_REGISTRY.try_with(|reg| {
155 if let Ok(mut r) = reg.try_borrow_mut() {
156 r.clear();
157 }
158 });
159}
160
161type ScopeSetFn = fn(Option<TaskScope>);
171type ScopeGetFn = fn() -> Option<TaskScope>;
172
173pub struct ScopeStore {
180 pub set_fn: ScopeSetFn,
182 pub get_fn: ScopeGetFn,
184}
185
186use std::sync::OnceLock;
187static SCOPE_STORE: OnceLock<ScopeStore> = OnceLock::new();
188
189fn ensure_default_store() -> &'static ScopeStore {
190 SCOPE_STORE.get_or_init(|| ScopeStore {
191 set_fn: thread_local_set,
192 get_fn: thread_local_get,
193 })
194}
195
196pub fn set_scope_store(store: ScopeStore) {
212 let _ = SCOPE_STORE.set(store);
213}
214
215thread_local! {
242 static CURRENT_SCOPE: RefCell<Option<TaskScope>> = const { RefCell::new(None) };
243}
244
245fn thread_local_set(scope: Option<TaskScope>) {
246 CURRENT_SCOPE.with(|cell| {
247 cell.replace(scope);
248 });
249}
250
251fn thread_local_get() -> Option<TaskScope> {
252 CURRENT_SCOPE.with(|cell| cell.borrow().clone())
253}
254
255pub(crate) fn set_scope_direct(scope: Option<TaskScope>) {
260 let store = ensure_default_store();
261 (store.set_fn)(scope);
262}
263
264pub(crate) fn get_scope_direct() -> Option<TaskScope> {
266 let store = ensure_default_store();
267 (store.get_fn)()
268}
269
270#[cfg(feature = "ssr-tokio")]
286pub fn init_scope_store_tokio() {
287 tokio::task_local! {
288 static TK_SCOPE: std::cell::RefCell<Option<TaskScope>>;
289 }
290
291 let _ = TK_SCOPE.try_with(|cell| {
293 cell.replace(None);
294 });
295
296 set_scope_store(ScopeStore {
297 set_fn: |s| {
298 let _ = TK_SCOPE.try_with(|cell| {
299 cell.replace(s);
300 });
301 },
302 get_fn: || {
303 TK_SCOPE
304 .try_with(|cell| cell.borrow().clone())
305 .ok()
306 .flatten()
307 },
308 });
309}
310
311pub fn with_current_scope<R>(scope: &TaskScope, f: impl FnOnce() -> R) -> R {
321 let store = ensure_default_store();
322 let prev = (store.get_fn)();
323 (store.set_fn)(Some(scope.clone_inner()));
324 let result = f();
325 (store.set_fn)(prev);
326 result
327}
328
329#[must_use]
331pub fn current_scope() -> Option<TaskScope> {
332 let store = ensure_default_store();
333 (store.get_fn)()
334}
335
336struct TaskScopeInner {
341 id: ScopeId,
342 task_ids: Vec<TaskId>,
343 children: Vec<TaskScope>,
344 parent: Option<Weak<RefCell<TaskScopeInner>>>,
346 context: RefCell<HashMap<TypeId, Rc<dyn Any>>>,
348 callbacks: RefCell<Vec<CallbackHandle>>,
350 #[cfg(feature = "debug")]
352 debug_label: Option<String>,
353 cancelled: bool,
354}
355
356#[must_use]
382pub struct TaskScope {
383 inner: Rc<RefCell<TaskScopeInner>>,
384 suspended: Rc<Cell<bool>>,
389}
390
391impl TaskScope {
392 pub fn new() -> Self {
394 let inner = Rc::new(RefCell::new(TaskScopeInner {
395 id: alloc_scope_id(),
396 task_ids: Vec::new(),
397 children: Vec::new(),
398 parent: None,
399 context: RefCell::new(HashMap::new()),
400 callbacks: RefCell::new(Vec::new()),
401 #[cfg(feature = "debug")]
402 debug_label: None,
403 cancelled: false,
404 }));
405 let id = inner.borrow().id;
406 let suspended = Rc::new(Cell::new(false));
407 register_scope(id, &inner, &suspended);
408 Self { inner, suspended }
409 }
410
411 pub fn new_child(parent: &Self) -> Self {
416 let inner = Rc::new(RefCell::new(TaskScopeInner {
417 id: alloc_scope_id(),
418 task_ids: Vec::new(),
419 children: Vec::new(),
420 parent: Some(Rc::downgrade(&parent.inner)),
421 context: RefCell::new(HashMap::new()),
422 callbacks: RefCell::new(Vec::new()),
423 #[cfg(feature = "debug")]
424 debug_label: None,
425 cancelled: false,
426 }));
427 let id = inner.borrow().id;
428 let suspended = Rc::new(Cell::new(false));
429 register_scope(id, &inner, &suspended);
430 let child = Self { inner, suspended };
431 parent.inner.borrow_mut().children.push(child.clone_inner());
432 child
433 }
434
435 pub fn spawn(&self, future: impl Future<Output = ()> + 'static) {
437 self.spawn_with_priority(Priority::Low, future);
438 }
439
440 pub fn spawn_with_priority(
446 &self,
447 priority: Priority,
448 future: impl Future<Output = ()> + 'static,
449 ) {
450 let mut inner = self.inner.borrow_mut();
451 if inner.cancelled {
452 return;
453 }
454 let task_id =
455 with_current_scope(self, || executor::spawn_scoped(priority, inner.id, future));
456 inner.task_ids.push(task_id);
457 }
458
459 pub fn register_callback_handle(&self, handle: CallbackHandle) {
467 let inner = self.inner.borrow();
468 if inner.cancelled {
469 return;
470 }
471 inner.callbacks.borrow_mut().push(handle);
472 }
473
474 pub fn provide<T: 'static>(&self, value: T) {
482 self.inner
483 .borrow()
484 .context
485 .borrow_mut()
486 .insert(TypeId::of::<T>(), Rc::new(value));
487 }
488
489 #[must_use]
494 pub fn consume<T: 'static>(&self) -> Option<Rc<T>> {
495 let mut current = Some(Rc::clone(&self.inner));
496
497 while let Some(inner) = current {
498 {
500 let inner_ref = inner.borrow();
501 let ctx = inner_ref.context.borrow();
502 if let Some(val) = ctx.get(&TypeId::of::<T>()) {
503 if let Ok(downcast) = val.clone().downcast::<T>() {
504 return Some(downcast);
505 }
506 }
507 }
508
509 let parent = {
511 let inner_ref = inner.borrow();
512 inner_ref.parent.as_ref().and_then(Weak::upgrade)
513 };
514 current = parent;
515 }
516
517 None
518 }
519
520 #[must_use]
527 #[track_caller]
528 pub fn expect_context<T: 'static>(&self) -> Rc<T> {
529 self.consume::<T>()
530 .unwrap_or_else(|| panic!("context not found: {}", std::any::type_name::<T>()))
531 }
532
533 #[cfg(feature = "debug")]
539 pub fn set_debug_label(&self, label: impl Into<String>) {
540 self.inner.borrow_mut().debug_label = Some(label.into());
541 }
542
543 #[cfg(test)]
547 #[must_use]
548 pub fn task_count(&self) -> usize {
549 self.inner.borrow().task_ids.len()
550 }
551
552 #[cfg(test)]
554 #[must_use]
555 pub fn child_count(&self) -> usize {
556 self.inner.borrow().children.len()
557 }
558
559 fn clone_inner(&self) -> Self {
562 Self {
563 inner: Rc::clone(&self.inner),
564 suspended: Rc::clone(&self.suspended),
565 }
566 }
567
568 pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
573 with_current_scope(self, f)
574 }
575
576 pub fn suspend(&self) {
586 if self.suspended.get() {
587 return;
588 }
589 self.suspended.set(true);
590 let children: Vec<TaskScope> = {
592 self.inner
593 .borrow()
594 .children
595 .iter()
596 .map(TaskScope::clone_inner)
597 .collect()
598 };
599 for child in &children {
600 child.suspend();
601 }
602 }
603
604 pub fn resume(&self) {
609 if !self.suspended.get() {
610 return;
611 }
612 self.suspended.set(false);
613
614 let (scope_id, children) = {
615 let inner = self.inner.borrow();
616 let id = inner.id;
617 let children: Vec<TaskScope> =
618 inner.children.iter().map(TaskScope::clone_inner).collect();
619 (id, children)
620 };
621
622 executor::enqueue_scope_tasks(scope_id);
624
625 for child in &children {
627 child.resume();
628 }
629 }
630
631 #[must_use]
633 pub fn is_suspended(&self) -> bool {
634 self.suspended.get()
635 }
636}
637
638impl Default for TaskScope {
639 fn default() -> Self {
640 Self::new()
641 }
642}
643
644impl Clone for TaskScope {
645 fn clone(&self) -> Self {
646 self.clone_inner()
647 }
648}
649
650impl Drop for TaskScope {
657 fn drop(&mut self) {
658 let Ok(mut inner) = self.inner.try_borrow_mut() else {
659 #[cfg(debug_assertions)]
663 {
664 eprintln!(
665 "[auralis-task] WARNING: TaskScope::drop cannot borrow inner \
666 (already borrowed). If this was the last clone, tasks and \
667 callbacks will leak. Avoid dropping the last TaskScope clone \
668 inside a callback or during executor flush."
669 );
670 }
671 return;
672 };
673 if inner.cancelled {
674 return;
675 }
676 inner.cancelled = true;
677
678 inner.callbacks.borrow_mut().clear();
680
681 let mut descendants: Vec<Rc<RefCell<TaskScopeInner>>> = Vec::new();
683 {
684 let mut queue: VecDeque<Rc<RefCell<TaskScopeInner>>> = VecDeque::new();
685 for child in &inner.children {
686 queue.push_back(Rc::clone(&child.inner));
687 }
688
689 while let Some(scope_rc) = queue.pop_front() {
690 let scope = scope_rc.borrow();
691 for child in &scope.children {
692 queue.push_back(Rc::clone(&child.inner));
693 }
694 descendants.push(Rc::clone(&scope_rc));
695 }
696 }
697
698 for scope_rc in descendants.iter().rev() {
700 let mut scope = scope_rc.borrow_mut();
701 if scope.cancelled {
702 continue;
703 }
704 scope.cancelled = true;
705
706 scope.callbacks.borrow_mut().clear();
708
709 if !scope.task_ids.is_empty() {
710 let dropped_futures: Vec<Pin<Box<dyn Future<Output = ()>>>> =
711 executor::cancel_scope_tasks(scope.id);
712 drop(dropped_futures);
713 }
714 scope.context.borrow_mut().clear();
716
717 unregister_scope(scope.id);
722 }
723
724 if !inner.task_ids.is_empty() {
726 let dropped_futures = executor::cancel_scope_tasks(inner.id);
727 drop(dropped_futures);
728 }
729
730 inner.context.borrow_mut().clear();
731 inner.children.clear();
732
733 unregister_scope(inner.id);
735 }
736}
737
738#[macro_export]
748macro_rules! provide_context {
749 ($scope:expr, $value:expr) => {
750 $scope.provide($value)
751 };
752}
753
754#[macro_export]
760macro_rules! consume_context {
761 ($scope:expr, $ty:ty) => {
762 $scope.consume::<$ty>()
763 };
764}
765
766#[cfg(test)]
771#[allow(clippy::items_after_statements)]
772mod tests {
773 use super::*;
774 use crate::executor::{self, init_flush_scheduler, reset_executor_for_test, TestScheduleFlush};
775 use crate::{init_time_source, ScheduleFlush, TestTimeSource, TimeSource};
776 use auralis_signal::Signal;
777 use std::cell::{Cell, RefCell};
778 use std::rc::Rc;
779 use std::time::Duration;
780
781 fn init() {
782 reset_executor_for_test();
783 init_flush_scheduler(Rc::new(TestScheduleFlush));
784 }
785
786 #[test]
789 fn new_scope_has_zero_tasks() {
790 let scope = TaskScope::new();
791 assert_eq!(scope.task_count(), 0);
792 assert_eq!(scope.child_count(), 0);
793 }
794
795 #[test]
796 fn new_child_attaches_to_parent() {
797 let parent = TaskScope::new();
798 let _child = TaskScope::new_child(&parent);
799 assert_eq!(parent.child_count(), 1);
800 }
801
802 #[test]
803 fn spawn_adds_task() {
804 init();
805 let scope = TaskScope::new();
806 scope.spawn(async {});
807 assert_eq!(scope.task_count(), 1);
808 }
809
810 #[test]
811 fn spawn_and_complete() {
812 init();
813 let done = Rc::new(Cell::new(false));
814 let done2 = Rc::clone(&done);
815 spawn_global(async move {
816 done2.set(true);
817 });
818 assert!(done.get());
819 }
820
821 #[test]
822 fn scope_spawn_and_cancel() {
823 init();
824 let dropped = Rc::new(Cell::new(false));
825 {
826 let scope = TaskScope::new();
827 let d = Rc::clone(&dropped);
828 struct DropCheck(Rc<Cell<bool>>);
829 impl Drop for DropCheck {
830 fn drop(&mut self) {
831 self.0.set(true);
832 }
833 }
834 scope.spawn(async move {
835 let _guard = DropCheck(d);
836 std::future::pending::<()>().await;
837 });
838 assert_eq!(executor::debug_task_count(), 1);
839 }
840 assert!(dropped.get());
841 assert_eq!(executor::debug_task_count(), 0);
842 }
843
844 #[test]
845 fn nested_scope_child_cancel_with_parent() {
846 init();
847 let dropped_child = Rc::new(Cell::new(false));
848 {
849 let parent = TaskScope::new();
850 let child = TaskScope::new_child(&parent);
851 let d = Rc::clone(&dropped_child);
852 struct DropCheck(Rc<Cell<bool>>);
853 impl Drop for DropCheck {
854 fn drop(&mut self) {
855 self.0.set(true);
856 }
857 }
858 child.spawn(async move {
859 let _guard = DropCheck(d);
860 std::future::pending::<()>().await;
861 });
862 assert_eq!(executor::debug_task_count(), 1);
863 }
864 assert!(dropped_child.get());
865 assert_eq!(executor::debug_task_count(), 0);
866 }
867
868 #[test]
869 fn deeply_nested_scope_drop_no_stack_overflow() {
870 init();
871 let root = TaskScope::new();
872 {
873 let mut current = TaskScope::new_child(&root);
874 for _ in 0..199 {
875 current = TaskScope::new_child(¤t);
876 }
877 }
878 drop(root);
879 assert_eq!(executor::debug_task_count(), 0);
880 }
881
882 #[test]
883 fn scope_child_explicit_tree() {
884 let root = TaskScope::new();
885 let a = TaskScope::new_child(&root);
886 let b = TaskScope::new_child(&root);
887 let _a1 = TaskScope::new_child(&a);
888 let _a2 = TaskScope::new_child(&a);
889 assert_eq!(root.child_count(), 2);
890 assert_eq!(a.child_count(), 2);
891 assert_eq!(b.child_count(), 0);
892 }
893
894 #[test]
897 fn callback_handle_dropped_before_tasks() {
898 init();
899 let dropped_order: Rc<RefCell<Vec<String>>> = Rc::new(RefCell::new(Vec::new()));
900 {
901 let scope = TaskScope::new();
902 let order1 = Rc::clone(&dropped_order);
903 scope.register_callback_handle(CallbackHandle::new(move || {
904 order1.borrow_mut().push("callback".to_string());
905 }));
906 let order2 = Rc::clone(&dropped_order);
907 struct DropCheck {
908 order: Rc<RefCell<Vec<String>>>,
909 label: String,
910 }
911 impl Drop for DropCheck {
912 fn drop(&mut self) {
913 self.order.borrow_mut().push(self.label.clone());
914 }
915 }
916 scope.spawn(async move {
917 let _guard = DropCheck {
918 order: order2,
919 label: "task".to_string(),
920 };
921 std::future::pending::<()>().await;
922 });
923 }
924 let order = dropped_order.borrow().clone();
925 assert_eq!(order, vec!["callback", "task"]);
926 }
927
928 #[test]
929 fn callback_handle_cleaned_up_on_child_scope_drop() {
930 init();
931 let called = Rc::new(Cell::new(false));
932 {
933 let parent = TaskScope::new();
934 let child = TaskScope::new_child(&parent);
935 let c = Rc::clone(&called);
936 child.register_callback_handle(CallbackHandle::new(move || {
937 c.set(true);
938 }));
939 }
941 assert!(called.get());
942 }
943
944 #[test]
947 fn context_provide_and_consume_in_same_scope() {
948 let scope = TaskScope::new();
949 scope.provide(42i32);
950 assert_eq!(*scope.consume::<i32>().unwrap(), 42);
951 }
952
953 #[test]
954 fn context_consume_walks_up_to_parent() {
955 let parent = TaskScope::new();
956 parent.provide("hello".to_string());
957 let child = TaskScope::new_child(&parent);
958 assert_eq!(*child.consume::<String>().unwrap(), "hello");
959 }
960
961 #[test]
962 fn context_consume_not_found() {
963 let scope = TaskScope::new();
964 assert!(scope.consume::<i32>().is_none());
965 }
966
967 #[test]
968 fn context_removed_on_scope_drop() {
969 let parent = TaskScope::new();
970 parent.provide(99u32);
971 {
972 let _child = TaskScope::new_child(&parent);
973 }
975 assert_eq!(*parent.consume::<u32>().unwrap(), 99);
977 }
978
979 #[test]
980 fn context_shadowing() {
981 let parent = TaskScope::new();
982 parent.provide(1i32);
983 let child = TaskScope::new_child(&parent);
984 child.provide(2i32);
985 assert_eq!(*child.consume::<i32>().unwrap(), 2);
987 assert_eq!(*parent.consume::<i32>().unwrap(), 1);
989 }
990
991 #[test]
992 #[should_panic(expected = "context not found")]
993 fn expect_context_panics_when_missing() {
994 let scope = TaskScope::new();
995 let _ = scope.expect_context::<String>();
996 }
997
998 #[test]
1001 fn executor_priority_ordering() {
1002 init();
1003 let order = Rc::new(RefCell::new(Vec::new()));
1004 let o1 = Rc::clone(&order);
1005 executor::spawn_no_auto_flush(Priority::Low, async move {
1006 o1.borrow_mut().push("low");
1007 });
1008 let o2 = Rc::clone(&order);
1009 executor::spawn_no_auto_flush(Priority::High, async move {
1010 o2.borrow_mut().push("high");
1011 });
1012 executor::flush_all();
1013 let result = order.borrow().clone();
1014 assert_eq!(result, vec!["high", "low"]);
1015 }
1016
1017 #[test]
1018 fn executor_batch() {
1019 init();
1020 let counter = Rc::new(Cell::new(0u32));
1021 for _ in 0..10 {
1022 let c = Rc::clone(&counter);
1023 spawn_global(async move {
1024 c.set(c.get() + 1);
1025 });
1026 }
1027 assert_eq!(counter.get(), 10);
1028 assert_eq!(executor::debug_task_count(), 0);
1029 }
1030
1031 #[test]
1032 fn no_leak_on_cancel() {
1033 init();
1034 for _ in 0..50 {
1035 let scope = TaskScope::new();
1036 for _ in 0..5 {
1037 scope.spawn(std::future::pending::<()>());
1038 }
1039 }
1040 assert_eq!(executor::debug_task_count(), 0);
1041 }
1042
1043 #[test]
1044 fn set_deferred_triggers_after_flush() {
1045 use auralis_signal::Signal;
1046 init();
1047 let sig = Signal::new(0);
1048 let observed = Rc::new(Cell::new(0));
1049 set_deferred(&sig, 42);
1050 assert_eq!(sig.read(), 42);
1051 let ob1 = Rc::clone(&observed);
1052 spawn_global(async move {
1053 ob1.set(sig.read());
1054 });
1055 assert_eq!(observed.get(), 42);
1056 }
1057
1058 #[test]
1059 fn set_deferred_in_drop_safe() {
1060 use auralis_signal::Signal;
1061 init();
1062 let sig = Signal::new(0);
1063 struct SetOnDrop {
1064 sig: Signal<i32>,
1065 val: i32,
1066 }
1067 impl Drop for SetOnDrop {
1068 fn drop(&mut self) {
1069 set_deferred(&self.sig, self.val);
1070 }
1071 }
1072 let guard = SetOnDrop {
1073 sig: sig.clone(),
1074 val: 99,
1075 };
1076 drop(guard);
1077 assert_eq!(sig.read(), 99);
1078 }
1079
1080 #[test]
1081 fn set_deferred_from_drop_guard_during_scope_cancel() {
1082 use auralis_signal::Signal;
1083 init();
1084
1085 let sig = Signal::new(0i32);
1086
1087 struct ResetOnDrop {
1091 sig: Signal<i32>,
1092 }
1093 impl Drop for ResetOnDrop {
1094 fn drop(&mut self) {
1095 set_deferred(&self.sig, 42);
1096 }
1097 }
1098
1099 {
1100 let scope = TaskScope::new();
1101 let s = sig.clone();
1102 scope.spawn(async move {
1103 let _guard = ResetOnDrop { sig: s };
1104 std::future::pending::<()>().await;
1107 });
1108 }
1110
1111 assert_eq!(
1113 sig.read(),
1114 42,
1115 "set_deferred should have fired after scope cancel"
1116 );
1117 }
1118
1119 #[test]
1120 fn yield_now_gives_other_tasks_a_turn() {
1121 init();
1122 let order = Rc::new(RefCell::new(Vec::new()));
1123 let o1 = Rc::clone(&order);
1124 executor::spawn_no_auto_flush(Priority::Low, async move {
1125 o1.borrow_mut().push("a1");
1126 executor::yield_now().await;
1127 o1.borrow_mut().push("a2");
1128 });
1129 let o2 = Rc::clone(&order);
1130 executor::spawn_no_auto_flush(Priority::Low, async move {
1131 o2.borrow_mut().push("b1");
1132 o2.borrow_mut().push("b2");
1133 });
1134 executor::flush_all();
1135 let r = order.borrow().clone();
1136 assert_eq!(&r[0..3], &["a1", "b1", "b2"][..]);
1137 assert!(r.contains(&"a2"));
1138 }
1139
1140 #[test]
1141 fn panic_in_task_is_isolated() {
1142 init();
1143 let survived = Rc::new(Cell::new(false));
1144 let s = Rc::clone(&survived);
1145 spawn_global(async move {
1146 panic!("intentional test panic");
1147 });
1148 spawn_global(async move {
1149 s.set(true);
1150 });
1151 assert!(survived.get());
1152 assert_eq!(executor::debug_task_count(), 0);
1153 }
1154
1155 #[test]
1158 fn time_budget_with_test_time_source() {
1159 init();
1160 let ts = Rc::new(TestTimeSource::new(0));
1161 init_time_source(ts.clone());
1162
1163 let polled = Rc::new(Cell::new(0u32));
1164
1165 for _ in 0..50 {
1168 let pc = Rc::clone(&polled);
1169 let ts_c = Rc::clone(&ts);
1170 executor::spawn_no_auto_flush(Priority::Low, async move {
1171 pc.set(pc.get() + 1);
1172 ts_c.advance(1);
1173 });
1174 }
1175
1176 executor::flush_all();
1180
1181 assert_eq!(polled.get(), 50);
1182 assert_eq!(executor::debug_task_count(), 0);
1183 }
1184
1185 #[test]
1186 fn time_budget_honoured_with_split() {
1187 let schedule_count = Rc::new(Cell::new(0u32));
1191 struct NoopScheduleFlush(Rc<Cell<u32>>);
1192 impl ScheduleFlush for NoopScheduleFlush {
1193 fn schedule(&self, _callback: Box<dyn FnOnce()>) {
1194 self.0.set(self.0.get() + 1);
1195 }
1198 }
1199 init_flush_scheduler(Rc::new(NoopScheduleFlush(Rc::clone(&schedule_count))));
1200
1201 let ts = Rc::new(TestTimeSource::new(0));
1202 init_time_source(ts.clone());
1203
1204 let polled = Rc::new(RefCell::new(Vec::new()));
1205
1206 for i in 0..50u32 {
1207 let pc = Rc::clone(&polled);
1208 let ts_c = Rc::clone(&ts);
1209 executor::spawn_no_auto_flush(Priority::Low, async move {
1210 pc.borrow_mut().push(i);
1211 ts_c.advance(1);
1212 });
1213 }
1214
1215 executor::flush_all();
1216
1217 let completed = polled.borrow().len();
1218 assert!(
1219 completed < 50,
1220 "budget should split before all tasks run (only {completed} of 50)"
1221 );
1222 assert!(
1223 completed >= 7,
1224 "at least 7 tasks should run before budget expires ({completed})"
1225 );
1226 assert_eq!(
1227 schedule_count.get(),
1228 1,
1229 "next flush should have been scheduled exactly once"
1230 );
1231
1232 init_flush_scheduler(Rc::new(TestScheduleFlush));
1235 executor::flush_all();
1236 assert_eq!(executor::debug_task_count(), 0);
1237 }
1238
1239 #[test]
1242 fn provide_context_macro_works() {
1243 let scope = TaskScope::new();
1244 provide_context!(scope, 42i32);
1245 assert_eq!(*scope.consume::<i32>().unwrap(), 42);
1246 }
1247
1248 #[test]
1249 fn consume_context_macro_works() {
1250 let scope = TaskScope::new();
1251 scope.provide(99u32);
1252 let val: Option<Rc<u32>> = consume_context!(scope, u32);
1253 assert_eq!(*val.unwrap(), 99);
1254 }
1255
1256 #[test]
1257 fn consume_context_macro_not_found() {
1258 let scope = TaskScope::new();
1259 let val: Option<Rc<String>> = consume_context!(scope, String);
1260 assert!(val.is_none());
1261 }
1262
1263 #[cfg(feature = "debug")]
1266 #[test]
1267 fn dump_task_tree_returns_string() {
1268 init();
1269 let scope = TaskScope::new();
1270 scope.spawn(async { std::future::pending::<()>().await });
1271
1272 let output = crate::dump_task_tree();
1273 assert!(output.contains("Auralis Task Tree"));
1274 assert!(output.contains("Total active tasks: 1"));
1275 assert!(output.contains("Scope"));
1276 }
1277
1278 #[cfg(feature = "debug")]
1279 #[test]
1280 fn dump_task_tree_empty() {
1281 init();
1282 let output = crate::dump_task_tree();
1283 assert!(output.contains("(no active tasks)"));
1284 }
1285
1286 use crate::{set_deferred, spawn_global};
1287
1288 #[test]
1291 fn suspend_prevents_task_execution() {
1292 init();
1293 let scope = TaskScope::new();
1294 let executed = Rc::new(Cell::new(false));
1295 let ex = Rc::clone(&executed);
1296 scope.spawn(async move {
1297 ex.set(true);
1298 });
1299 assert!(executed.get());
1301 executed.set(false);
1302
1303 scope.suspend();
1304 let ex2 = Rc::clone(&executed);
1305 scope.spawn(async move {
1306 ex2.set(true);
1307 });
1308 assert!(!executed.get());
1310 }
1311
1312 #[test]
1313 fn resume_allows_task_execution() {
1314 init();
1315 let scope = TaskScope::new();
1316 scope.suspend();
1317 let executed = Rc::new(Cell::new(false));
1318 let ex = Rc::clone(&executed);
1319 scope.spawn(async move {
1320 ex.set(true);
1321 });
1322 assert!(!executed.get());
1323
1324 scope.resume();
1325 assert!(executed.get());
1327 }
1328
1329 #[test]
1330 fn suspend_cascades_to_children() {
1331 init();
1332 let parent = TaskScope::new();
1333 let child = TaskScope::new_child(&parent);
1334 assert!(!child.is_suspended());
1335
1336 parent.suspend();
1337 assert!(parent.is_suspended());
1338 assert!(child.is_suspended());
1339 }
1340
1341 #[test]
1342 fn resume_cascades_to_children() {
1343 init();
1344 let parent = TaskScope::new();
1345 let child = TaskScope::new_child(&parent);
1346 parent.suspend();
1347 assert!(child.is_suspended());
1348
1349 parent.resume();
1350 assert!(!parent.is_suspended());
1351 assert!(!child.is_suspended());
1352 }
1353
1354 #[test]
1355 fn multiple_suspend_resume_no_leak() {
1356 init();
1357 let scope = TaskScope::new();
1358 for _ in 0..50 {
1359 scope.suspend();
1360 assert!(scope.is_suspended());
1361 scope.resume();
1362 assert!(!scope.is_suspended());
1363 }
1364 }
1366
1367 #[test]
1368 fn suspended_scope_drops_without_panic() {
1369 init();
1370 {
1371 let scope = TaskScope::new();
1372 scope.suspend();
1373 let d = Rc::new(Cell::new(false));
1374 struct DropCheck(Rc<Cell<bool>>);
1375 impl Drop for DropCheck {
1376 fn drop(&mut self) {
1377 self.0.set(true);
1378 }
1379 }
1380 scope.spawn(async move {
1381 let _guard = DropCheck(d);
1382 std::future::pending::<()>().await;
1383 });
1384 }
1387 assert_eq!(executor::debug_task_count(), 0);
1389 }
1390
1391 #[test]
1392 fn siblings_not_affected_by_suspend() {
1393 init();
1394 let parent = TaskScope::new();
1395 let child_a = TaskScope::new_child(&parent);
1396 let child_b = TaskScope::new_child(&parent);
1397
1398 child_a.suspend();
1399 assert!(child_a.is_suspended());
1400 assert!(!child_b.is_suspended());
1401 assert!(!parent.is_suspended());
1402 }
1403
1404 use crate::Executor;
1407
1408 #[test]
1409 fn flush_instance_panicking_task_is_isolated() {
1410 init();
1411 let ex = Executor::new_instance();
1412 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1413
1414 let survived = Rc::new(Cell::new(false));
1415 let s = Rc::clone(&survived);
1416
1417 Executor::spawn(&ex, async move {
1418 panic!("intentional test panic in instance executor");
1419 });
1420 Executor::spawn(&ex, async move {
1421 s.set(true);
1422 });
1423 Executor::flush_instance(&ex);
1424
1425 assert!(survived.get());
1426 }
1427
1428 #[test]
1429 fn flush_instance_spawn_and_complete() {
1430 init();
1431 let ex = Executor::new_instance();
1432 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1433
1434 let counter = Rc::new(Cell::new(0u32));
1435 for _ in 0..20 {
1436 let c = Rc::clone(&counter);
1437 Executor::spawn(&ex, async move {
1438 c.set(c.get() + 1);
1439 });
1440 }
1441 Executor::flush_instance(&ex);
1442 assert_eq!(counter.get(), 20);
1443 }
1444
1445 use crate::timer;
1448
1449 #[test]
1450 fn timer_zero_duration_completes_immediately() {
1451 init();
1452 let done = Rc::new(Cell::new(false));
1453 let d = Rc::clone(&done);
1454 spawn_global(async move {
1455 timer::sleep(Duration::ZERO).await;
1456 d.set(true);
1457 });
1458 assert!(done.get());
1460 }
1461
1462 #[test]
1463 fn timer_normal_delay_fires_after_time_advances() {
1464 init();
1465 let ts = Rc::new(TestTimeSource::new(0));
1466 init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
1467
1468 let done = Rc::new(Cell::new(false));
1469 let d = Rc::clone(&done);
1470 spawn_global(async move {
1471 timer::sleep(Duration::from_millis(100)).await;
1472 d.set(true);
1473 });
1474 assert!(!done.get());
1476
1477 ts.advance(150);
1480 crate::executor::flush_all();
1481 assert!(done.get());
1482 }
1483
1484 #[test]
1485 fn timer_across_multiple_flushes() {
1486 init();
1487 let ts = Rc::new(TestTimeSource::new(0));
1488 init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
1489
1490 let counter = Rc::new(Cell::new(0u32));
1491 let c = Rc::clone(&counter);
1492 spawn_global(async move {
1493 for _ in 0..3 {
1494 timer::sleep(Duration::from_millis(100)).await;
1495 c.set(c.get() + 1);
1496 }
1497 });
1498 assert_eq!(counter.get(), 0);
1499
1500 ts.advance(100);
1501 crate::executor::flush_all();
1502 assert_eq!(counter.get(), 1);
1503
1504 ts.advance(100);
1505 crate::executor::flush_all();
1506 assert_eq!(counter.get(), 2);
1507
1508 ts.advance(100);
1509 crate::executor::flush_all();
1510 assert_eq!(counter.get(), 3);
1511 }
1512
1513 #[test]
1514 fn timer_cancelled_by_scope_drop() {
1515 init();
1516 let executed = Rc::new(Cell::new(false));
1517 let ex = Rc::clone(&executed);
1518 {
1519 let scope = TaskScope::new();
1520 scope.spawn(async move {
1521 timer::sleep(Duration::from_millis(500)).await;
1522 ex.set(true);
1523 });
1524 }
1525 assert!(!executed.get());
1528 assert_eq!(executor::debug_task_count(), 0);
1529 }
1530
1531 #[test]
1532 fn reentrant_flush_is_noop() {
1533 init();
1534 let reentered = Rc::new(Cell::new(false));
1541 let r = Rc::clone(&reentered);
1542 let sig = Signal::new(0);
1543 auralis_signal::subscribe(&sig, Rc::new(move || r.set(true)));
1544 sig.set(1);
1548 assert!(reentered.get());
1549 }
1550
1551 #[test]
1552 fn instance_executor_timer() {
1553 init();
1554 let ex = Executor::new_instance();
1555 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1556 let ts = Rc::new(TestTimeSource::new(0));
1557 Executor::install_time_source(&ex, Rc::clone(&ts) as Rc<dyn TimeSource>);
1558
1559 let done = Rc::new(Cell::new(false));
1560 let d = Rc::clone(&done);
1561 Executor::spawn(&ex, async move {
1562 timer::sleep(Duration::from_millis(50)).await;
1563 d.set(true);
1564 });
1565 assert!(!done.get());
1566
1567 ts.advance(60);
1569 Executor::flush_instance(&ex);
1570 assert!(done.get());
1571 }
1572
1573 #[test]
1574 fn set_deferred_routes_to_instance_executor() {
1575 init();
1576 let ex = Executor::new_instance();
1577 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1578
1579 let sig = Signal::new(0);
1580 let s = sig.clone();
1581
1582 Executor::spawn(&ex, async move {
1584 crate::set_deferred(&s, 42);
1585 });
1586
1587 Executor::flush_instance(&ex);
1589 assert_eq!(sig.read(), 42);
1591 }
1592
1593 #[test]
1596 fn panic_hook_is_invoked_on_task_panic() {
1597 init();
1598 let hook_called = Rc::new(Cell::new(false));
1599 let hc = Rc::clone(&hook_called);
1600
1601 crate::set_panic_hook(Rc::new(move |_info| {
1602 hc.set(true);
1603 }));
1604
1605 let scope = TaskScope::new();
1606 scope.spawn(async move { panic!("intentional") });
1607
1608 assert!(hook_called.get());
1610 }
1611
1612 #[test]
1613 fn current_scope_available_in_spawned_task() {
1614 init();
1615 let scope = TaskScope::new();
1616 let found = Rc::new(Cell::new(false));
1617 let f = Rc::clone(&found);
1618 scope.spawn(async move {
1619 f.set(crate::current_scope().is_some());
1620 });
1621 assert!(found.get());
1622 }
1623
1624 #[test]
1625 fn callback_handle_noop_does_not_panic() {
1626 let _h = crate::CallbackHandle::noop();
1627 }
1629
1630 #[test]
1631 fn sync_callback_fallback_without_schedule_hook() {
1632 crate::reset_executor_for_test();
1635 let sig = Signal::new(0);
1638 let called = Rc::new(Cell::new(false));
1639 let c = Rc::clone(&called);
1640 auralis_signal::subscribe(&sig, Rc::new(move || c.set(true)));
1641
1642 sig.set(1);
1643 assert!(called.get());
1645 }
1646
1647 #[test]
1648 fn set_deferred_isolated_to_instance_executor() {
1649 init();
1650 let ex1 = Executor::new_instance();
1651 Executor::install_flush_scheduler(&ex1, Rc::new(TestScheduleFlush));
1652 let ex2 = Executor::new_instance();
1653 Executor::install_flush_scheduler(&ex2, Rc::new(TestScheduleFlush));
1654
1655 let sig1 = Signal::new(0);
1656 let sig2 = Signal::new(0);
1657 let s1 = sig1.clone();
1658
1659 crate::with_executor(&ex1, || {
1661 crate::set_deferred(&s1, 42);
1662 });
1663 Executor::flush_instance(&ex1);
1664 assert_eq!(sig1.read(), 42);
1665 assert_eq!(sig2.read(), 0);
1667 }
1668
1669 #[test]
1670 fn notify_signal_state_follow_up_handles_reentrant_dirty() {
1671 let sig = Signal::new(0);
1674 let sig2 = sig.clone();
1675 let count = Rc::new(Cell::new(0u32));
1676 let c = Rc::clone(&count);
1677
1678 auralis_signal::subscribe(
1679 &sig,
1680 Rc::new(move || {
1681 c.set(c.get() + 1);
1682 if c.get() == 1 {
1684 sig2.set(2);
1685 }
1686 }),
1687 );
1688
1689 sig.set(1);
1690 assert_eq!(sig.read(), 2);
1693 assert_eq!(count.get(), 2);
1694 }
1695}