1use std::any::{Any, TypeId};
6use std::cell::{Cell, RefCell};
7use std::collections::{HashMap, VecDeque};
8use std::fmt;
9use std::future::Future;
10use std::rc::{Rc, Weak};
11
12use auralis_signal::{Memo, Signal};
13
14use crate::executor;
15use crate::Priority;
16
17type ScopeId = u64;
18type TaskId = u64;
19
20thread_local! {
25 static NEXT_SCOPE_ID: Cell<ScopeId> = const { Cell::new(1) };
26}
27
28fn alloc_scope_id() -> ScopeId {
29 NEXT_SCOPE_ID.with(|c| {
30 let id = c.get();
31 c.set(id + 1);
32 id
33 })
34}
35
36pub struct CallbackHandle {
48 cleanup: Option<Box<dyn FnOnce() + 'static>>,
49}
50
51impl CallbackHandle {
52 pub fn new(cleanup: impl FnOnce() + 'static) -> Self {
54 Self {
55 cleanup: Some(Box::new(cleanup)),
56 }
57 }
58
59 #[must_use]
64 pub fn noop() -> Self {
65 Self { cleanup: None }
66 }
67}
68
69impl Drop for CallbackHandle {
70 fn drop(&mut self) {
71 if let Some(f) = self.cleanup.take() {
72 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
73 }
74 }
75}
76
77type ScopeRegistryEntry = (Weak<RefCell<TaskScopeInner>>, Weak<Cell<bool>>);
98
99thread_local! {
100 static SCOPE_REGISTRY: RefCell<HashMap<ScopeId, ScopeRegistryEntry>> =
101 RefCell::new(HashMap::new());
102}
103
104fn register_scope(id: ScopeId, inner: &Rc<RefCell<TaskScopeInner>>, suspended: &Rc<Cell<bool>>) {
107 let _ = SCOPE_REGISTRY.try_with(|reg| {
108 if let Ok(mut r) = reg.try_borrow_mut() {
109 r.insert(id, (Rc::downgrade(inner), Rc::downgrade(suspended)));
110 }
111 });
112}
113
114fn unregister_scope(id: ScopeId) {
115 let _ = SCOPE_REGISTRY.try_with(|reg| {
116 if let Ok(mut r) = reg.try_borrow_mut() {
117 r.remove(&id);
118 }
119 });
120}
121
122#[must_use]
126pub fn find_scope(scope_id: ScopeId) -> Option<TaskScope> {
127 SCOPE_REGISTRY
128 .try_with(|reg| {
129 if let Ok(r) = reg.try_borrow() {
130 r.get(&scope_id).and_then(|(inner_weak, suspended_weak)| {
131 let inner = inner_weak.upgrade()?;
132 let suspended = suspended_weak.upgrade()?;
133 let cancelled = inner.borrow().cancelled.clone();
134 Some(TaskScope {
135 inner,
136 cancelled,
137 suspended,
138 })
139 })
140 } else {
141 None
142 }
143 })
144 .ok()
145 .flatten()
146}
147
148#[cfg(feature = "debug")]
152#[doc(hidden)]
153#[must_use]
154pub fn scope_debug_label(scope_id: ScopeId) -> Option<String> {
155 find_scope(scope_id).and_then(|s| s.inner.borrow().debug_label.clone())
156}
157
158#[doc(hidden)]
160pub fn clear_scope_registry() {
161 let _ = SCOPE_REGISTRY.try_with(|reg| {
162 if let Ok(mut r) = reg.try_borrow_mut() {
163 r.clear();
164 }
165 });
166}
167
168type ScopeSetFn = fn(Option<TaskScope>);
178type ScopeGetFn = fn() -> Option<TaskScope>;
179
180pub struct ScopeStore {
187 pub set_fn: ScopeSetFn,
189 pub get_fn: ScopeGetFn,
191}
192
193use std::sync::OnceLock;
194static SCOPE_STORE: OnceLock<ScopeStore> = OnceLock::new();
195
196fn ensure_default_store() -> &'static ScopeStore {
197 SCOPE_STORE.get_or_init(|| ScopeStore {
198 set_fn: thread_local_set,
199 get_fn: thread_local_get,
200 })
201}
202
203pub fn set_scope_store(store: ScopeStore) {
219 let _ = SCOPE_STORE.set(store);
220}
221
222thread_local! {
249 static CURRENT_SCOPE: RefCell<Option<TaskScope>> = const { RefCell::new(None) };
250}
251
252fn thread_local_set(scope: Option<TaskScope>) {
253 CURRENT_SCOPE.with(|cell| {
254 cell.replace(scope);
255 });
256}
257
258fn thread_local_get() -> Option<TaskScope> {
259 CURRENT_SCOPE.with(|cell| cell.borrow().clone())
260}
261
262pub(crate) fn set_scope_direct(scope: Option<TaskScope>) {
267 let store = ensure_default_store();
268 (store.set_fn)(scope);
269}
270
271pub(crate) fn get_scope_direct() -> Option<TaskScope> {
273 let store = ensure_default_store();
274 (store.get_fn)()
275}
276
277#[cfg(feature = "ssr-tokio")]
293pub fn init_scope_store_tokio() {
294 tokio::task_local! {
295 static TK_SCOPE: std::cell::RefCell<Option<TaskScope>>;
296 }
297
298 let _ = TK_SCOPE.try_with(|cell| {
300 cell.replace(None);
301 });
302
303 set_scope_store(ScopeStore {
304 set_fn: |s| {
305 let _ = TK_SCOPE.try_with(|cell| {
306 cell.replace(s);
307 });
308 },
309 get_fn: || {
310 TK_SCOPE
311 .try_with(|cell| cell.borrow().clone())
312 .ok()
313 .flatten()
314 },
315 });
316}
317
318pub fn with_current_scope<R>(scope: &TaskScope, f: impl FnOnce() -> R) -> R {
328 let store = ensure_default_store();
329 let prev = (store.get_fn)();
330 (store.set_fn)(Some(scope.clone_inner()));
331 let result = f();
332 (store.set_fn)(prev);
333 result
334}
335
336#[must_use]
338pub fn current_scope() -> Option<TaskScope> {
339 let store = ensure_default_store();
340 (store.get_fn)()
341}
342
343struct TaskScopeInner {
348 id: ScopeId,
349 task_ids: Vec<TaskId>,
350 children: Vec<TaskScope>,
351 parent: Option<Weak<RefCell<TaskScopeInner>>>,
353 context: RefCell<HashMap<TypeId, Rc<dyn Any>>>,
355 callbacks: RefCell<Vec<CallbackHandle>>,
357 cancelled: Rc<Cell<bool>>,
362 #[cfg(feature = "debug")]
364 debug_label: Option<String>,
365 executor: executor::ExecutorRef,
370}
371
372pub struct JoinHandle {
382 task_id: Option<TaskId>,
383 executor: executor::ExecutorRef,
384}
385
386impl JoinHandle {
387 pub fn cancel(&self) {
392 if let Some(tid) = self.task_id {
393 executor::cancel_task(&self.executor, tid);
394 }
395 }
396
397 pub fn is_finished(&self) -> bool {
402 match self.task_id {
403 Some(tid) => executor::is_task_finished(&self.executor, tid),
404 None => true,
405 }
406 }
407
408 pub fn task_id(&self) -> Option<TaskId> {
411 self.task_id
412 }
413}
414
415impl fmt::Debug for JoinHandle {
416 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
417 f.debug_struct("JoinHandle")
418 .field("task_id", &self.task_id)
419 .finish_non_exhaustive()
420 }
421}
422
423#[must_use]
449pub struct TaskScope {
450 inner: Rc<RefCell<TaskScopeInner>>,
451 cancelled: Rc<Cell<bool>>,
457 suspended: Rc<Cell<bool>>,
460}
461
462impl TaskScope {
463 pub fn new() -> Self {
467 Self::with_executor(&executor::current_executor_instance())
468 }
469
470 pub fn with_executor(ex: &executor::ExecutorRef) -> Self {
476 let cancelled = Rc::new(Cell::new(false));
477 let inner = Rc::new(RefCell::new(TaskScopeInner {
478 id: alloc_scope_id(),
479 task_ids: Vec::new(),
480 children: Vec::new(),
481 parent: None,
482 context: RefCell::new(HashMap::new()),
483 callbacks: RefCell::new(Vec::new()),
484 cancelled: Rc::clone(&cancelled),
485 #[cfg(feature = "debug")]
486 debug_label: None,
487 executor: Rc::clone(ex),
488 }));
489 let id = inner.borrow().id;
490 let suspended = Rc::new(Cell::new(false));
491 register_scope(id, &inner, &suspended);
492 Self {
493 inner,
494 cancelled,
495 suspended,
496 }
497 }
498
499 pub fn new_child(parent: &Self) -> Self {
501 let ex = parent.inner.borrow().executor.clone();
502 let cancelled = Rc::new(Cell::new(false));
503 let inner = Rc::new(RefCell::new(TaskScopeInner {
504 id: alloc_scope_id(),
505 task_ids: Vec::new(),
506 children: Vec::new(),
507 parent: Some(Rc::downgrade(&parent.inner)),
508 context: RefCell::new(HashMap::new()),
509 callbacks: RefCell::new(Vec::new()),
510 cancelled: Rc::clone(&cancelled),
511 #[cfg(feature = "debug")]
512 debug_label: None,
513 executor: ex,
514 }));
515 let id = inner.borrow().id;
516 let suspended = Rc::new(Cell::new(false));
517 register_scope(id, &inner, &suspended);
518 let child = Self {
519 inner,
520 cancelled,
521 suspended,
522 };
523 parent.inner.borrow_mut().children.push(child.clone_inner());
524 child
525 }
526
527 pub fn spawn(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle {
533 self.spawn_with_priority(Priority::Low, future)
534 }
535
536 pub fn spawn_with_priority(
544 &self,
545 priority: Priority,
546 future: impl Future<Output = ()> + 'static,
547 ) -> JoinHandle {
548 let inner = self.inner.borrow();
549 if inner.cancelled.get() {
550 return JoinHandle {
551 task_id: None,
552 executor: Rc::clone(&inner.executor),
553 };
554 }
555 let ex = Rc::clone(&inner.executor);
556 let task_id = executor::with_executor(&ex, || {
557 with_current_scope(self, || {
558 executor::spawn_scoped_on(&ex, priority, inner.id, future)
559 })
560 });
561 drop(inner);
562 self.inner.borrow_mut().task_ids.push(task_id);
563 JoinHandle {
564 task_id: Some(task_id),
565 executor: ex,
566 }
567 }
568
569 pub fn watch<T: Clone + 'static>(
582 &self,
583 sig: &Signal<T>,
584 f: impl FnMut(&T) + 'static,
585 ) -> JoinHandle {
586 let s = sig.clone();
587 let mut f = f;
588 self.spawn(async move {
589 loop {
590 s.changed().await;
591 f(&s.read());
592 }
593 })
594 }
595
596 pub fn watch_effect(&self, effect: impl Fn() + 'static) -> JoinHandle {
605 let memo = Memo::new(effect);
606 self.spawn(async move {
607 loop {
608 memo.changed().await;
609 #[allow(clippy::let_unit_value, clippy::ignored_unit_patterns)]
610 let _ = memo.read();
611 }
612 })
613 }
614
615 pub fn register_callback_handle(&self, handle: CallbackHandle) {
623 let inner = self.inner.borrow();
624 if inner.cancelled.get() {
625 return;
626 }
627 inner.callbacks.borrow_mut().push(handle);
628 }
629
630 pub fn on_cleanup(&self, f: impl FnOnce() + 'static) {
639 self.register_callback_handle(CallbackHandle::new(f));
640 }
641
642 pub fn provide<T: 'static>(&self, value: T) {
650 self.inner
651 .borrow()
652 .context
653 .borrow_mut()
654 .insert(TypeId::of::<T>(), Rc::new(value));
655 }
656
657 #[must_use]
662 pub fn consume<T: 'static>(&self) -> Option<Rc<T>> {
663 let mut current = Some(Rc::clone(&self.inner));
664
665 while let Some(inner) = current {
666 {
668 let inner_ref = inner.borrow();
669 let ctx = inner_ref.context.borrow();
670 if let Some(val) = ctx.get(&TypeId::of::<T>()) {
671 if let Ok(downcast) = val.clone().downcast::<T>() {
672 return Some(downcast);
673 }
674 }
675 }
676
677 let parent = {
679 let inner_ref = inner.borrow();
680 inner_ref.parent.as_ref().and_then(Weak::upgrade)
681 };
682 current = parent;
683 }
684
685 None
686 }
687
688 #[must_use]
695 #[track_caller]
696 pub fn expect_context<T: 'static>(&self) -> Rc<T> {
697 self.consume::<T>()
698 .unwrap_or_else(|| panic!("context not found: {}", std::any::type_name::<T>()))
699 }
700
701 #[must_use]
705 pub fn is_cancelled(&self) -> bool {
706 self.cancelled.get()
707 }
708
709 #[cfg(feature = "debug")]
715 pub fn set_debug_label(&self, label: impl Into<String>) {
716 self.inner.borrow_mut().debug_label = Some(label.into());
717 }
718
719 #[cfg(test)]
723 #[must_use]
724 pub fn task_count(&self) -> usize {
725 self.inner.borrow().task_ids.len()
726 }
727
728 #[cfg(test)]
730 #[must_use]
731 pub fn child_count(&self) -> usize {
732 self.inner.borrow().children.len()
733 }
734
735 fn clone_inner(&self) -> Self {
738 Self {
739 inner: Rc::clone(&self.inner),
740 cancelled: Rc::clone(&self.cancelled),
741 suspended: Rc::clone(&self.suspended),
742 }
743 }
744
745 pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
750 with_current_scope(self, f)
751 }
752
753 pub fn suspend(&self) {
763 if self.suspended.get() {
764 return;
765 }
766 self.suspended.set(true);
767 let children: Vec<TaskScope> = {
769 self.inner
770 .borrow()
771 .children
772 .iter()
773 .map(TaskScope::clone_inner)
774 .collect()
775 };
776 for child in &children {
777 child.suspend();
778 }
779 }
780
781 pub fn resume(&self) {
786 if !self.suspended.get() {
787 return;
788 }
789 self.suspended.set(false);
790
791 let (task_ids, children) = {
792 let inner = self.inner.borrow();
793 let tids = inner.task_ids.clone();
794 let children: Vec<TaskScope> =
795 inner.children.iter().map(TaskScope::clone_inner).collect();
796 (tids, children)
797 };
798
799 let ex = Rc::clone(&self.inner.borrow().executor);
801 executor::enqueue_scope_tasks_on(&ex, &task_ids);
802
803 for child in &children {
805 child.resume();
806 }
807 }
808
809 #[must_use]
811 pub fn is_suspended(&self) -> bool {
812 self.suspended.get()
813 }
814}
815
816impl Default for TaskScope {
817 fn default() -> Self {
818 Self::new()
819 }
820}
821
822impl Clone for TaskScope {
823 fn clone(&self) -> Self {
824 self.clone_inner()
825 }
826}
827
828impl Drop for TaskScope {
835 fn drop(&mut self) {
836 if Rc::strong_count(&self.inner) > 1 {
841 return;
842 }
843
844 self.cancelled.set(true);
848
849 let Ok(mut inner) = self.inner.try_borrow_mut() else {
850 eprintln!(
855 "[auralis-task] WARNING: TaskScope::drop cannot borrow inner \
856 (already borrowed). Tasks and callbacks in this scope will \
857 be cleaned up on the next executor flush. Avoid dropping \
858 the last TaskScope clone inside a callback."
859 );
860 return;
861 };
862
863 inner.callbacks.borrow_mut().clear();
865
866 let mut descendants: Vec<Rc<RefCell<TaskScopeInner>>> = Vec::new();
868 {
869 let mut queue: VecDeque<Rc<RefCell<TaskScopeInner>>> = VecDeque::new();
870 for child in &inner.children {
871 queue.push_back(Rc::clone(&child.inner));
872 }
873
874 while let Some(scope_rc) = queue.pop_front() {
875 let scope = scope_rc.borrow();
876 for child in &scope.children {
877 queue.push_back(Rc::clone(&child.inner));
878 }
879 descendants.push(Rc::clone(&scope_rc));
880 }
881 }
882
883 for scope_rc in descendants.iter().rev() {
885 let mut scope = scope_rc.borrow_mut();
886 if scope.cancelled.get() {
887 continue;
888 }
889 scope.cancelled.set(true);
890
891 scope.callbacks.borrow_mut().clear();
893
894 if !scope.task_ids.is_empty() {
895 let ex = Rc::clone(&scope.executor);
896 let task_ids = std::mem::take(&mut scope.task_ids);
897 let dropped_futures = executor::cancel_scope_tasks_on(&ex, &task_ids);
898 drop(dropped_futures);
899 }
900 scope.context.borrow_mut().clear();
901 unregister_scope(scope.id);
902 }
903
904 if !inner.task_ids.is_empty() {
906 let ex = Rc::clone(&inner.executor);
907 let task_ids = std::mem::take(&mut inner.task_ids);
908 let dropped_futures = executor::cancel_scope_tasks_on(&ex, &task_ids);
909 drop(dropped_futures);
910 }
911
912 inner.context.borrow_mut().clear();
913 inner.children.clear();
914
915 unregister_scope(inner.id);
917 }
918}
919
920#[macro_export]
930macro_rules! provide_context {
931 ($scope:expr, $value:expr) => {
932 $scope.provide($value)
933 };
934}
935
936#[macro_export]
942macro_rules! consume_context {
943 ($scope:expr, $ty:ty) => {
944 $scope.consume::<$ty>()
945 };
946}
947
948#[cfg(test)]
953#[allow(clippy::items_after_statements)]
954mod tests {
955 use super::*;
956 use crate::executor::{self, init_flush_scheduler, reset_executor_for_test, TestScheduleFlush};
957 use crate::{init_time_source, ScheduleFlush, TestTimeSource, TimeSource};
958 use auralis_signal::Signal;
959 use std::cell::{Cell, RefCell};
960 use std::rc::Rc;
961 use std::time::Duration;
962
963 fn init() {
964 reset_executor_for_test();
965 init_flush_scheduler(Rc::new(TestScheduleFlush));
966 }
967
968 #[test]
971 fn new_scope_has_zero_tasks() {
972 let scope = TaskScope::new();
973 assert_eq!(scope.task_count(), 0);
974 assert_eq!(scope.child_count(), 0);
975 }
976
977 #[test]
978 fn new_child_attaches_to_parent() {
979 let parent = TaskScope::new();
980 let _child = TaskScope::new_child(&parent);
981 assert_eq!(parent.child_count(), 1);
982 }
983
984 #[test]
985 fn spawn_adds_task() {
986 init();
987 let scope = TaskScope::new();
988 scope.spawn(async {});
989 assert_eq!(scope.task_count(), 1);
990 }
991
992 #[test]
993 fn spawn_and_complete() {
994 init();
995 let done = Rc::new(Cell::new(false));
996 let done2 = Rc::clone(&done);
997 spawn_global(async move {
998 done2.set(true);
999 });
1000 assert!(done.get());
1001 }
1002
1003 #[test]
1004 fn scope_spawn_and_cancel() {
1005 init();
1006 let dropped = Rc::new(Cell::new(false));
1007 {
1008 let scope = TaskScope::new();
1009 let d = Rc::clone(&dropped);
1010 struct DropCheck(Rc<Cell<bool>>);
1011 impl Drop for DropCheck {
1012 fn drop(&mut self) {
1013 self.0.set(true);
1014 }
1015 }
1016 scope.spawn(async move {
1017 let _guard = DropCheck(d);
1018 std::future::pending::<()>().await;
1019 });
1020 assert_eq!(executor::debug_task_count(), 1);
1021 }
1022 assert!(dropped.get());
1023 assert_eq!(executor::debug_task_count(), 0);
1024 }
1025
1026 #[test]
1027 fn nested_scope_child_cancel_with_parent() {
1028 init();
1029 let dropped_child = Rc::new(Cell::new(false));
1030 {
1031 let parent = TaskScope::new();
1032 let child = TaskScope::new_child(&parent);
1033 let d = Rc::clone(&dropped_child);
1034 struct DropCheck(Rc<Cell<bool>>);
1035 impl Drop for DropCheck {
1036 fn drop(&mut self) {
1037 self.0.set(true);
1038 }
1039 }
1040 child.spawn(async move {
1041 let _guard = DropCheck(d);
1042 std::future::pending::<()>().await;
1043 });
1044 assert_eq!(executor::debug_task_count(), 1);
1045 }
1046 assert!(dropped_child.get());
1047 assert_eq!(executor::debug_task_count(), 0);
1048 }
1049
1050 #[test]
1051 fn deeply_nested_scope_drop_no_stack_overflow() {
1052 init();
1053 let root = TaskScope::new();
1054 {
1055 let mut current = TaskScope::new_child(&root);
1056 for _ in 0..199 {
1057 current = TaskScope::new_child(¤t);
1058 }
1059 }
1060 drop(root);
1061 assert_eq!(executor::debug_task_count(), 0);
1062 }
1063
1064 #[test]
1065 fn scope_child_explicit_tree() {
1066 let root = TaskScope::new();
1067 let a = TaskScope::new_child(&root);
1068 let b = TaskScope::new_child(&root);
1069 let _a1 = TaskScope::new_child(&a);
1070 let _a2 = TaskScope::new_child(&a);
1071 assert_eq!(root.child_count(), 2);
1072 assert_eq!(a.child_count(), 2);
1073 assert_eq!(b.child_count(), 0);
1074 }
1075
1076 #[test]
1079 fn callback_handle_dropped_before_tasks() {
1080 init();
1081 let dropped_order: Rc<RefCell<Vec<String>>> = Rc::new(RefCell::new(Vec::new()));
1082 {
1083 let scope = TaskScope::new();
1084 let order1 = Rc::clone(&dropped_order);
1085 scope.register_callback_handle(CallbackHandle::new(move || {
1086 order1.borrow_mut().push("callback".to_string());
1087 }));
1088 let order2 = Rc::clone(&dropped_order);
1089 struct DropCheck {
1090 order: Rc<RefCell<Vec<String>>>,
1091 label: String,
1092 }
1093 impl Drop for DropCheck {
1094 fn drop(&mut self) {
1095 self.order.borrow_mut().push(self.label.clone());
1096 }
1097 }
1098 scope.spawn(async move {
1099 let _guard = DropCheck {
1100 order: order2,
1101 label: "task".to_string(),
1102 };
1103 std::future::pending::<()>().await;
1104 });
1105 }
1106 let order = dropped_order.borrow().clone();
1107 assert_eq!(order, vec!["callback", "task"]);
1108 }
1109
1110 #[test]
1111 fn callback_handle_cleaned_up_on_child_scope_drop() {
1112 init();
1113 let called = Rc::new(Cell::new(false));
1114 {
1115 let parent = TaskScope::new();
1116 let child = TaskScope::new_child(&parent);
1117 let c = Rc::clone(&called);
1118 child.register_callback_handle(CallbackHandle::new(move || {
1119 c.set(true);
1120 }));
1121 }
1123 assert!(called.get());
1124 }
1125
1126 #[test]
1129 fn context_provide_and_consume_in_same_scope() {
1130 let scope = TaskScope::new();
1131 scope.provide(42i32);
1132 assert_eq!(*scope.consume::<i32>().unwrap(), 42);
1133 }
1134
1135 #[test]
1136 fn context_consume_walks_up_to_parent() {
1137 let parent = TaskScope::new();
1138 parent.provide("hello".to_string());
1139 let child = TaskScope::new_child(&parent);
1140 assert_eq!(*child.consume::<String>().unwrap(), "hello");
1141 }
1142
1143 #[test]
1144 fn context_consume_not_found() {
1145 let scope = TaskScope::new();
1146 assert!(scope.consume::<i32>().is_none());
1147 }
1148
1149 #[test]
1150 fn context_removed_on_scope_drop() {
1151 let parent = TaskScope::new();
1152 parent.provide(99u32);
1153 {
1154 let _child = TaskScope::new_child(&parent);
1155 }
1157 assert_eq!(*parent.consume::<u32>().unwrap(), 99);
1159 }
1160
1161 #[test]
1162 fn context_shadowing() {
1163 let parent = TaskScope::new();
1164 parent.provide(1i32);
1165 let child = TaskScope::new_child(&parent);
1166 child.provide(2i32);
1167 assert_eq!(*child.consume::<i32>().unwrap(), 2);
1169 assert_eq!(*parent.consume::<i32>().unwrap(), 1);
1171 }
1172
1173 #[test]
1174 #[should_panic(expected = "context not found")]
1175 fn expect_context_panics_when_missing() {
1176 let scope = TaskScope::new();
1177 let _ = scope.expect_context::<String>();
1178 }
1179
1180 #[test]
1183 fn executor_priority_ordering() {
1184 init();
1185 let order = Rc::new(RefCell::new(Vec::new()));
1186 let o1 = Rc::clone(&order);
1187 executor::spawn_no_auto_flush(Priority::Low, async move {
1188 o1.borrow_mut().push("low");
1189 });
1190 let o2 = Rc::clone(&order);
1191 executor::spawn_no_auto_flush(Priority::High, async move {
1192 o2.borrow_mut().push("high");
1193 });
1194 executor::flush_all();
1195 let result = order.borrow().clone();
1196 assert_eq!(result, vec!["high", "low"]);
1197 }
1198
1199 #[test]
1200 fn executor_batch() {
1201 init();
1202 let counter = Rc::new(Cell::new(0u32));
1203 for _ in 0..10 {
1204 let c = Rc::clone(&counter);
1205 spawn_global(async move {
1206 c.set(c.get() + 1);
1207 });
1208 }
1209 assert_eq!(counter.get(), 10);
1210 assert_eq!(executor::debug_task_count(), 0);
1211 }
1212
1213 #[test]
1214 fn no_leak_on_cancel() {
1215 init();
1216 for _ in 0..50 {
1217 let scope = TaskScope::new();
1218 for _ in 0..5 {
1219 scope.spawn(std::future::pending::<()>());
1220 }
1221 }
1222 assert_eq!(executor::debug_task_count(), 0);
1223 }
1224
1225 #[test]
1226 fn set_deferred_triggers_after_flush() {
1227 use auralis_signal::Signal;
1228 init();
1229 let sig = Signal::new(0);
1230 let observed = Rc::new(Cell::new(0));
1231 set_deferred(&sig, 42);
1232 assert_eq!(sig.read(), 42);
1233 let ob1 = Rc::clone(&observed);
1234 spawn_global(async move {
1235 ob1.set(sig.read());
1236 });
1237 assert_eq!(observed.get(), 42);
1238 }
1239
1240 #[test]
1241 fn set_deferred_in_drop_safe() {
1242 use auralis_signal::Signal;
1243 init();
1244 let sig = Signal::new(0);
1245 struct SetOnDrop {
1246 sig: Signal<i32>,
1247 val: i32,
1248 }
1249 impl Drop for SetOnDrop {
1250 fn drop(&mut self) {
1251 set_deferred(&self.sig, self.val);
1252 }
1253 }
1254 let guard = SetOnDrop {
1255 sig: sig.clone(),
1256 val: 99,
1257 };
1258 drop(guard);
1259 assert_eq!(sig.read(), 99);
1260 }
1261
1262 #[test]
1263 fn set_deferred_from_drop_guard_during_scope_cancel() {
1264 use auralis_signal::Signal;
1265 init();
1266
1267 let sig = Signal::new(0i32);
1268
1269 struct ResetOnDrop {
1273 sig: Signal<i32>,
1274 }
1275 impl Drop for ResetOnDrop {
1276 fn drop(&mut self) {
1277 set_deferred(&self.sig, 42);
1278 }
1279 }
1280
1281 {
1282 let scope = TaskScope::new();
1283 let s = sig.clone();
1284 scope.spawn(async move {
1285 let _guard = ResetOnDrop { sig: s };
1286 std::future::pending::<()>().await;
1289 });
1290 }
1292
1293 assert_eq!(
1295 sig.read(),
1296 42,
1297 "set_deferred should have fired after scope cancel"
1298 );
1299 }
1300
1301 #[test]
1302 fn yield_now_gives_other_tasks_a_turn() {
1303 init();
1304 let order = Rc::new(RefCell::new(Vec::new()));
1305 let o1 = Rc::clone(&order);
1306 executor::spawn_no_auto_flush(Priority::Low, async move {
1307 o1.borrow_mut().push("a1");
1308 executor::yield_now().await;
1309 o1.borrow_mut().push("a2");
1310 });
1311 let o2 = Rc::clone(&order);
1312 executor::spawn_no_auto_flush(Priority::Low, async move {
1313 o2.borrow_mut().push("b1");
1314 o2.borrow_mut().push("b2");
1315 });
1316 executor::flush_all();
1317 let r = order.borrow().clone();
1318 assert_eq!(&r[0..3], &["a1", "b1", "b2"][..]);
1319 assert!(r.contains(&"a2"));
1320 }
1321
1322 #[test]
1323 fn panic_in_task_is_isolated() {
1324 init();
1325 let survived = Rc::new(Cell::new(false));
1326 let s = Rc::clone(&survived);
1327 spawn_global(async move {
1328 panic!("intentional test panic");
1329 });
1330 spawn_global(async move {
1331 s.set(true);
1332 });
1333 assert!(survived.get());
1334 assert_eq!(executor::debug_task_count(), 0);
1335 }
1336
1337 #[test]
1340 fn time_budget_with_test_time_source() {
1341 init();
1342 let ts = Rc::new(TestTimeSource::new(0));
1343 init_time_source(ts.clone());
1344
1345 let polled = Rc::new(Cell::new(0u32));
1346
1347 for _ in 0..50 {
1350 let pc = Rc::clone(&polled);
1351 let ts_c = Rc::clone(&ts);
1352 executor::spawn_no_auto_flush(Priority::Low, async move {
1353 pc.set(pc.get() + 1);
1354 ts_c.advance(1);
1355 });
1356 }
1357
1358 executor::flush_all();
1362
1363 assert_eq!(polled.get(), 50);
1364 assert_eq!(executor::debug_task_count(), 0);
1365 }
1366
1367 #[test]
1368 fn time_budget_honoured_with_split() {
1369 let schedule_count = Rc::new(Cell::new(0u32));
1373 struct NoopScheduleFlush(Rc<Cell<u32>>);
1374 impl ScheduleFlush for NoopScheduleFlush {
1375 fn schedule(&self, _callback: Box<dyn FnOnce()>) {
1376 self.0.set(self.0.get() + 1);
1377 }
1380 }
1381 init_flush_scheduler(Rc::new(NoopScheduleFlush(Rc::clone(&schedule_count))));
1382
1383 let ts = Rc::new(TestTimeSource::new(0));
1384 init_time_source(ts.clone());
1385
1386 let polled = Rc::new(RefCell::new(Vec::new()));
1387
1388 for i in 0..50u32 {
1389 let pc = Rc::clone(&polled);
1390 let ts_c = Rc::clone(&ts);
1391 executor::spawn_no_auto_flush(Priority::Low, async move {
1392 pc.borrow_mut().push(i);
1393 ts_c.advance(1);
1394 });
1395 }
1396
1397 executor::flush_all();
1398
1399 let completed = polled.borrow().len();
1400 assert!(
1401 completed < 50,
1402 "budget should split before all tasks run (only {completed} of 50)"
1403 );
1404 assert!(
1405 completed >= 7,
1406 "at least 7 tasks should run before budget expires ({completed})"
1407 );
1408 assert_eq!(
1409 schedule_count.get(),
1410 1,
1411 "next flush should have been scheduled exactly once"
1412 );
1413
1414 init_flush_scheduler(Rc::new(TestScheduleFlush));
1417 executor::flush_all();
1418 assert_eq!(executor::debug_task_count(), 0);
1419 }
1420
1421 #[test]
1424 fn provide_context_macro_works() {
1425 let scope = TaskScope::new();
1426 provide_context!(scope, 42i32);
1427 assert_eq!(*scope.consume::<i32>().unwrap(), 42);
1428 }
1429
1430 #[test]
1431 fn consume_context_macro_works() {
1432 let scope = TaskScope::new();
1433 scope.provide(99u32);
1434 let val: Option<Rc<u32>> = consume_context!(scope, u32);
1435 assert_eq!(*val.unwrap(), 99);
1436 }
1437
1438 #[test]
1439 fn consume_context_macro_not_found() {
1440 let scope = TaskScope::new();
1441 let val: Option<Rc<String>> = consume_context!(scope, String);
1442 assert!(val.is_none());
1443 }
1444
1445 #[cfg(feature = "debug")]
1448 #[test]
1449 fn dump_task_tree_returns_string() {
1450 init();
1451 let scope = TaskScope::new();
1452 scope.spawn(async { std::future::pending::<()>().await });
1453
1454 let output = crate::dump_task_tree();
1455 assert!(output.contains("Auralis Task Tree"));
1456 assert!(output.contains("Total active tasks: 1"));
1457 assert!(output.contains("Scope"));
1458 }
1459
1460 #[cfg(feature = "debug")]
1461 #[test]
1462 fn dump_task_tree_empty() {
1463 init();
1464 let output = crate::dump_task_tree();
1465 assert!(output.contains("(no active tasks)"));
1466 }
1467
1468 use crate::{set_deferred, spawn_global};
1469
1470 #[test]
1473 fn suspend_prevents_task_execution() {
1474 init();
1475 let scope = TaskScope::new();
1476 let executed = Rc::new(Cell::new(false));
1477 let ex = Rc::clone(&executed);
1478 scope.spawn(async move {
1479 ex.set(true);
1480 });
1481 assert!(executed.get());
1483 executed.set(false);
1484
1485 scope.suspend();
1486 let ex2 = Rc::clone(&executed);
1487 scope.spawn(async move {
1488 ex2.set(true);
1489 });
1490 assert!(!executed.get());
1492 }
1493
1494 #[test]
1495 fn resume_allows_task_execution() {
1496 init();
1497 let scope = TaskScope::new();
1498 scope.suspend();
1499 let executed = Rc::new(Cell::new(false));
1500 let ex = Rc::clone(&executed);
1501 scope.spawn(async move {
1502 ex.set(true);
1503 });
1504 assert!(!executed.get());
1505
1506 scope.resume();
1507 assert!(executed.get());
1509 }
1510
1511 #[test]
1512 fn suspend_cascades_to_children() {
1513 init();
1514 let parent = TaskScope::new();
1515 let child = TaskScope::new_child(&parent);
1516 assert!(!child.is_suspended());
1517
1518 parent.suspend();
1519 assert!(parent.is_suspended());
1520 assert!(child.is_suspended());
1521 }
1522
1523 #[test]
1524 fn resume_cascades_to_children() {
1525 init();
1526 let parent = TaskScope::new();
1527 let child = TaskScope::new_child(&parent);
1528 parent.suspend();
1529 assert!(child.is_suspended());
1530
1531 parent.resume();
1532 assert!(!parent.is_suspended());
1533 assert!(!child.is_suspended());
1534 }
1535
1536 #[test]
1537 fn multiple_suspend_resume_no_leak() {
1538 init();
1539 let scope = TaskScope::new();
1540 for _ in 0..50 {
1541 scope.suspend();
1542 assert!(scope.is_suspended());
1543 scope.resume();
1544 assert!(!scope.is_suspended());
1545 }
1546 }
1548
1549 #[test]
1550 fn suspended_scope_drops_without_panic() {
1551 init();
1552 {
1553 let scope = TaskScope::new();
1554 scope.suspend();
1555 let d = Rc::new(Cell::new(false));
1556 struct DropCheck(Rc<Cell<bool>>);
1557 impl Drop for DropCheck {
1558 fn drop(&mut self) {
1559 self.0.set(true);
1560 }
1561 }
1562 scope.spawn(async move {
1563 let _guard = DropCheck(d);
1564 std::future::pending::<()>().await;
1565 });
1566 }
1569 assert_eq!(executor::debug_task_count(), 0);
1571 }
1572
1573 #[test]
1574 fn siblings_not_affected_by_suspend() {
1575 init();
1576 let parent = TaskScope::new();
1577 let child_a = TaskScope::new_child(&parent);
1578 let child_b = TaskScope::new_child(&parent);
1579
1580 child_a.suspend();
1581 assert!(child_a.is_suspended());
1582 assert!(!child_b.is_suspended());
1583 assert!(!parent.is_suspended());
1584 }
1585
1586 use crate::Executor;
1589
1590 #[test]
1591 fn flush_instance_panicking_task_is_isolated() {
1592 init();
1593 let ex = Executor::new_instance();
1594 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1595
1596 let survived = Rc::new(Cell::new(false));
1597 let s = Rc::clone(&survived);
1598
1599 Executor::spawn(&ex, async move {
1600 panic!("intentional test panic in instance executor");
1601 });
1602 Executor::spawn(&ex, async move {
1603 s.set(true);
1604 });
1605 Executor::flush_instance(&ex);
1606
1607 assert!(survived.get());
1608 }
1609
1610 #[test]
1611 fn flush_instance_spawn_and_complete() {
1612 init();
1613 let ex = Executor::new_instance();
1614 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1615
1616 let counter = Rc::new(Cell::new(0u32));
1617 for _ in 0..20 {
1618 let c = Rc::clone(&counter);
1619 Executor::spawn(&ex, async move {
1620 c.set(c.get() + 1);
1621 });
1622 }
1623 Executor::flush_instance(&ex);
1624 assert_eq!(counter.get(), 20);
1625 }
1626
1627 use crate::timer;
1630
1631 #[test]
1632 fn timer_zero_duration_completes_immediately() {
1633 init();
1634 let done = Rc::new(Cell::new(false));
1635 let d = Rc::clone(&done);
1636 spawn_global(async move {
1637 timer::sleep(Duration::ZERO).await;
1638 d.set(true);
1639 });
1640 assert!(done.get());
1642 }
1643
1644 #[test]
1645 fn timer_normal_delay_fires_after_time_advances() {
1646 init();
1647 let ts = Rc::new(TestTimeSource::new(0));
1648 init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
1649
1650 let done = Rc::new(Cell::new(false));
1651 let d = Rc::clone(&done);
1652 spawn_global(async move {
1653 timer::sleep(Duration::from_millis(100)).await;
1654 d.set(true);
1655 });
1656 assert!(!done.get());
1658
1659 ts.advance(150);
1662 crate::executor::flush_all();
1663 assert!(done.get());
1664 }
1665
1666 #[test]
1667 fn timer_across_multiple_flushes() {
1668 init();
1669 let ts = Rc::new(TestTimeSource::new(0));
1670 init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
1671
1672 let counter = Rc::new(Cell::new(0u32));
1673 let c = Rc::clone(&counter);
1674 spawn_global(async move {
1675 for _ in 0..3 {
1676 timer::sleep(Duration::from_millis(100)).await;
1677 c.set(c.get() + 1);
1678 }
1679 });
1680 assert_eq!(counter.get(), 0);
1681
1682 ts.advance(100);
1683 crate::executor::flush_all();
1684 assert_eq!(counter.get(), 1);
1685
1686 ts.advance(100);
1687 crate::executor::flush_all();
1688 assert_eq!(counter.get(), 2);
1689
1690 ts.advance(100);
1691 crate::executor::flush_all();
1692 assert_eq!(counter.get(), 3);
1693 }
1694
1695 #[test]
1696 fn timer_cancelled_by_scope_drop() {
1697 init();
1698 let executed = Rc::new(Cell::new(false));
1699 let ex = Rc::clone(&executed);
1700 {
1701 let scope = TaskScope::new();
1702 scope.spawn(async move {
1703 timer::sleep(Duration::from_millis(500)).await;
1704 ex.set(true);
1705 });
1706 }
1707 assert!(!executed.get());
1710 assert_eq!(executor::debug_task_count(), 0);
1711 }
1712
1713 #[test]
1714 fn reentrant_flush_is_noop() {
1715 init();
1716 let reentered = Rc::new(Cell::new(false));
1723 let r = Rc::clone(&reentered);
1724 let sig = Signal::new(0);
1725 auralis_signal::subscribe(&sig, Rc::new(move || r.set(true)));
1726 sig.set(1);
1730 assert!(reentered.get());
1731 }
1732
1733 #[test]
1734 fn instance_executor_timer() {
1735 init();
1736 let ex = Executor::new_instance();
1737 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1738 let ts = Rc::new(TestTimeSource::new(0));
1739 Executor::install_time_source(&ex, Rc::clone(&ts) as Rc<dyn TimeSource>);
1740
1741 let done = Rc::new(Cell::new(false));
1742 let d = Rc::clone(&done);
1743 Executor::spawn(&ex, async move {
1744 timer::sleep(Duration::from_millis(50)).await;
1745 d.set(true);
1746 });
1747 assert!(!done.get());
1748
1749 ts.advance(60);
1751 Executor::flush_instance(&ex);
1752 assert!(done.get());
1753 }
1754
1755 #[test]
1756 fn set_deferred_routes_to_instance_executor() {
1757 init();
1758 let ex = Executor::new_instance();
1759 Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
1760
1761 let sig = Signal::new(0);
1762 let s = sig.clone();
1763
1764 Executor::spawn(&ex, async move {
1766 crate::set_deferred(&s, 42);
1767 });
1768
1769 Executor::flush_instance(&ex);
1771 assert_eq!(sig.read(), 42);
1773 }
1774
1775 #[test]
1778 fn panic_hook_is_invoked_on_task_panic() {
1779 init();
1780 let hook_called = Rc::new(Cell::new(false));
1781 let hc = Rc::clone(&hook_called);
1782
1783 crate::set_panic_hook(Rc::new(move |_info| {
1784 hc.set(true);
1785 }));
1786
1787 let scope = TaskScope::new();
1788 scope.spawn(async move { panic!("intentional") });
1789
1790 assert!(hook_called.get());
1792 }
1793
1794 #[test]
1795 fn current_scope_available_in_spawned_task() {
1796 init();
1797 let scope = TaskScope::new();
1798 let found = Rc::new(Cell::new(false));
1799 let f = Rc::clone(&found);
1800 scope.spawn(async move {
1801 f.set(crate::current_scope().is_some());
1802 });
1803 assert!(found.get());
1804 }
1805
1806 #[test]
1807 fn callback_handle_noop_does_not_panic() {
1808 let _h = crate::CallbackHandle::noop();
1809 }
1811
1812 #[test]
1813 fn sync_callback_fallback_without_schedule_hook() {
1814 crate::reset_executor_for_test();
1817 let sig = Signal::new(0);
1820 let called = Rc::new(Cell::new(false));
1821 let c = Rc::clone(&called);
1822 auralis_signal::subscribe(&sig, Rc::new(move || c.set(true)));
1823
1824 sig.set(1);
1825 assert!(called.get());
1827 }
1828
1829 #[test]
1830 fn set_deferred_isolated_to_instance_executor() {
1831 init();
1832 let ex1 = Executor::new_instance();
1833 Executor::install_flush_scheduler(&ex1, Rc::new(TestScheduleFlush));
1834 let ex2 = Executor::new_instance();
1835 Executor::install_flush_scheduler(&ex2, Rc::new(TestScheduleFlush));
1836
1837 let sig1 = Signal::new(0);
1838 let sig2 = Signal::new(0);
1839 let s1 = sig1.clone();
1840
1841 crate::with_executor(&ex1, || {
1843 crate::set_deferred(&s1, 42);
1844 });
1845 Executor::flush_instance(&ex1);
1846 assert_eq!(sig1.read(), 42);
1847 assert_eq!(sig2.read(), 0);
1849 }
1850
1851 #[test]
1852 fn notify_signal_state_follow_up_handles_reentrant_dirty() {
1853 let sig = Signal::new(0);
1856 let sig2 = sig.clone();
1857 let count = Rc::new(Cell::new(0u32));
1858 let c = Rc::clone(&count);
1859
1860 auralis_signal::subscribe(
1861 &sig,
1862 Rc::new(move || {
1863 c.set(c.get() + 1);
1864 if c.get() == 1 {
1866 sig2.set(2);
1867 }
1868 }),
1869 );
1870
1871 sig.set(1);
1872 assert_eq!(sig.read(), 2);
1875 assert_eq!(count.get(), 2);
1876 }
1877}