use std::any::{Any, TypeId};
use std::cell::{Cell, RefCell};
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::rc::{Rc, Weak};
use crate::executor;
use crate::Priority;
type ScopeId = u64;
type TaskId = u64;
thread_local! {
static NEXT_SCOPE_ID: Cell<ScopeId> = const { Cell::new(1) };
}
fn alloc_scope_id() -> ScopeId {
NEXT_SCOPE_ID.with(|c| {
let id = c.get();
c.set(id + 1);
id
})
}
pub struct CallbackHandle {
cleanup: Option<Box<dyn FnOnce() + 'static>>,
}
impl CallbackHandle {
pub fn new(cleanup: impl FnOnce() + 'static) -> Self {
Self {
cleanup: Some(Box::new(cleanup)),
}
}
#[must_use]
pub fn noop() -> Self {
Self { cleanup: None }
}
}
impl Drop for CallbackHandle {
fn drop(&mut self) {
if let Some(f) = self.cleanup.take() {
f();
}
}
}
type ScopeRegistryEntry = (Weak<RefCell<TaskScopeInner>>, Weak<Cell<bool>>);
thread_local! {
static SCOPE_REGISTRY: RefCell<HashMap<ScopeId, ScopeRegistryEntry>> =
RefCell::new(HashMap::new());
}
fn register_scope(id: ScopeId, inner: &Rc<RefCell<TaskScopeInner>>, suspended: &Rc<Cell<bool>>) {
let _ = SCOPE_REGISTRY.try_with(|reg| {
if let Ok(mut r) = reg.try_borrow_mut() {
r.insert(id, (Rc::downgrade(inner), Rc::downgrade(suspended)));
}
});
}
fn unregister_scope(id: ScopeId) {
let _ = SCOPE_REGISTRY.try_with(|reg| {
if let Ok(mut r) = reg.try_borrow_mut() {
r.remove(&id);
}
});
}
#[must_use]
pub fn find_scope(scope_id: ScopeId) -> Option<TaskScope> {
SCOPE_REGISTRY
.try_with(|reg| {
if let Ok(r) = reg.try_borrow() {
r.get(&scope_id).and_then(|(inner_weak, suspended_weak)| {
let inner = inner_weak.upgrade()?;
let suspended = suspended_weak.upgrade()?;
Some(TaskScope { inner, suspended })
})
} else {
None
}
})
.ok()
.flatten()
}
#[cfg(feature = "debug")]
#[doc(hidden)]
#[must_use]
pub fn scope_debug_label(scope_id: ScopeId) -> Option<String> {
find_scope(scope_id).and_then(|s| s.inner.borrow().debug_label.clone())
}
#[doc(hidden)]
pub fn clear_scope_registry() {
let _ = SCOPE_REGISTRY.try_with(|reg| {
if let Ok(mut r) = reg.try_borrow_mut() {
r.clear();
}
});
}
type ScopeSetFn = fn(Option<TaskScope>);
type ScopeGetFn = fn() -> Option<TaskScope>;
pub struct ScopeStore {
pub set_fn: ScopeSetFn,
pub get_fn: ScopeGetFn,
}
use std::sync::OnceLock;
static SCOPE_STORE: OnceLock<ScopeStore> = OnceLock::new();
fn ensure_default_store() -> &'static ScopeStore {
SCOPE_STORE.get_or_init(|| ScopeStore {
set_fn: thread_local_set,
get_fn: thread_local_get,
})
}
pub fn set_scope_store(store: ScopeStore) {
let _ = SCOPE_STORE.set(store);
}
thread_local! {
static CURRENT_SCOPE: RefCell<Option<TaskScope>> = const { RefCell::new(None) };
}
fn thread_local_set(scope: Option<TaskScope>) {
CURRENT_SCOPE.with(|cell| {
cell.replace(scope);
});
}
fn thread_local_get() -> Option<TaskScope> {
CURRENT_SCOPE.with(|cell| cell.borrow().clone())
}
pub(crate) fn set_scope_direct(scope: Option<TaskScope>) {
let store = ensure_default_store();
(store.set_fn)(scope);
}
pub(crate) fn get_scope_direct() -> Option<TaskScope> {
let store = ensure_default_store();
(store.get_fn)()
}
#[cfg(feature = "ssr-tokio")]
pub fn init_scope_store_tokio() {
tokio::task_local! {
static TK_SCOPE: std::cell::RefCell<Option<TaskScope>>;
}
let _ = TK_SCOPE.try_with(|cell| {
cell.replace(None);
});
set_scope_store(ScopeStore {
set_fn: |s| {
let _ = TK_SCOPE.try_with(|cell| {
cell.replace(s);
});
},
get_fn: || {
TK_SCOPE
.try_with(|cell| cell.borrow().clone())
.ok()
.flatten()
},
});
}
pub fn with_current_scope<R>(scope: &TaskScope, f: impl FnOnce() -> R) -> R {
let store = ensure_default_store();
let prev = (store.get_fn)();
(store.set_fn)(Some(scope.clone_inner()));
let result = f();
(store.set_fn)(prev);
result
}
#[must_use]
pub fn current_scope() -> Option<TaskScope> {
let store = ensure_default_store();
(store.get_fn)()
}
struct TaskScopeInner {
id: ScopeId,
task_ids: Vec<TaskId>,
children: Vec<TaskScope>,
parent: Option<Weak<RefCell<TaskScopeInner>>>,
context: RefCell<HashMap<TypeId, Rc<dyn Any>>>,
callbacks: RefCell<Vec<CallbackHandle>>,
#[cfg(feature = "debug")]
debug_label: Option<String>,
cancelled: bool,
executor: executor::ExecutorRef,
}
#[must_use]
pub struct TaskScope {
inner: Rc<RefCell<TaskScopeInner>>,
suspended: Rc<Cell<bool>>,
}
impl TaskScope {
pub fn new() -> Self {
Self::with_executor(&executor::current_executor_instance())
}
pub fn with_executor(ex: &executor::ExecutorRef) -> Self {
let inner = Rc::new(RefCell::new(TaskScopeInner {
id: alloc_scope_id(),
task_ids: Vec::new(),
children: Vec::new(),
parent: None,
context: RefCell::new(HashMap::new()),
callbacks: RefCell::new(Vec::new()),
#[cfg(feature = "debug")]
debug_label: None,
cancelled: false,
executor: Rc::clone(ex),
}));
let id = inner.borrow().id;
let suspended = Rc::new(Cell::new(false));
register_scope(id, &inner, &suspended);
Self { inner, suspended }
}
pub fn new_child(parent: &Self) -> Self {
let ex = parent.inner.borrow().executor.clone();
let inner = Rc::new(RefCell::new(TaskScopeInner {
id: alloc_scope_id(),
task_ids: Vec::new(),
children: Vec::new(),
parent: Some(Rc::downgrade(&parent.inner)),
context: RefCell::new(HashMap::new()),
callbacks: RefCell::new(Vec::new()),
#[cfg(feature = "debug")]
debug_label: None,
cancelled: false,
executor: ex,
}));
let id = inner.borrow().id;
let suspended = Rc::new(Cell::new(false));
register_scope(id, &inner, &suspended);
let child = Self { inner, suspended };
parent.inner.borrow_mut().children.push(child.clone_inner());
child
}
pub fn spawn(&self, future: impl Future<Output = ()> + 'static) {
self.spawn_with_priority(Priority::Low, future);
}
pub fn spawn_with_priority(
&self,
priority: Priority,
future: impl Future<Output = ()> + 'static,
) {
let inner = self.inner.borrow();
if inner.cancelled {
return;
}
let ex = Rc::clone(&inner.executor);
let task_id = executor::with_executor(&ex, || {
with_current_scope(self, || {
executor::spawn_scoped_on(&ex, priority, inner.id, future)
})
});
drop(inner);
self.inner.borrow_mut().task_ids.push(task_id);
}
pub fn register_callback_handle(&self, handle: CallbackHandle) {
let inner = self.inner.borrow();
if inner.cancelled {
return;
}
inner.callbacks.borrow_mut().push(handle);
}
pub fn provide<T: 'static>(&self, value: T) {
self.inner
.borrow()
.context
.borrow_mut()
.insert(TypeId::of::<T>(), Rc::new(value));
}
#[must_use]
pub fn consume<T: 'static>(&self) -> Option<Rc<T>> {
let mut current = Some(Rc::clone(&self.inner));
while let Some(inner) = current {
{
let inner_ref = inner.borrow();
let ctx = inner_ref.context.borrow();
if let Some(val) = ctx.get(&TypeId::of::<T>()) {
if let Ok(downcast) = val.clone().downcast::<T>() {
return Some(downcast);
}
}
}
let parent = {
let inner_ref = inner.borrow();
inner_ref.parent.as_ref().and_then(Weak::upgrade)
};
current = parent;
}
None
}
#[must_use]
#[track_caller]
pub fn expect_context<T: 'static>(&self) -> Rc<T> {
self.consume::<T>()
.unwrap_or_else(|| panic!("context not found: {}", std::any::type_name::<T>()))
}
#[must_use]
pub fn is_cancelled(&self) -> bool {
self.inner.borrow().cancelled
}
#[cfg(feature = "debug")]
pub fn set_debug_label(&self, label: impl Into<String>) {
self.inner.borrow_mut().debug_label = Some(label.into());
}
#[cfg(test)]
#[must_use]
pub fn task_count(&self) -> usize {
self.inner.borrow().task_ids.len()
}
#[cfg(test)]
#[must_use]
pub fn child_count(&self) -> usize {
self.inner.borrow().children.len()
}
fn clone_inner(&self) -> Self {
Self {
inner: Rc::clone(&self.inner),
suspended: Rc::clone(&self.suspended),
}
}
pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
with_current_scope(self, f)
}
pub fn suspend(&self) {
if self.suspended.get() {
return;
}
self.suspended.set(true);
let children: Vec<TaskScope> = {
self.inner
.borrow()
.children
.iter()
.map(TaskScope::clone_inner)
.collect()
};
for child in &children {
child.suspend();
}
}
pub fn resume(&self) {
if !self.suspended.get() {
return;
}
self.suspended.set(false);
let (scope_id, children) = {
let inner = self.inner.borrow();
let id = inner.id;
let children: Vec<TaskScope> =
inner.children.iter().map(TaskScope::clone_inner).collect();
(id, children)
};
let ex = Rc::clone(&self.inner.borrow().executor);
executor::enqueue_scope_tasks_on(&ex, scope_id);
for child in &children {
child.resume();
}
}
#[must_use]
pub fn is_suspended(&self) -> bool {
self.suspended.get()
}
}
impl Default for TaskScope {
fn default() -> Self {
Self::new()
}
}
impl Clone for TaskScope {
fn clone(&self) -> Self {
self.clone_inner()
}
}
impl Drop for TaskScope {
fn drop(&mut self) {
if Rc::strong_count(&self.inner) > 1 {
return;
}
let Ok(mut inner) = self.inner.try_borrow_mut() else {
#[cfg(debug_assertions)]
{
eprintln!(
"[auralis-task] WARNING: TaskScope::drop cannot borrow inner \
(already borrowed). If this was the last clone, tasks and \
callbacks will leak. Avoid dropping the last TaskScope clone \
inside a callback or during executor flush."
);
}
return;
};
if inner.cancelled {
return;
}
inner.cancelled = true;
inner.callbacks.borrow_mut().clear();
let mut descendants: Vec<Rc<RefCell<TaskScopeInner>>> = Vec::new();
{
let mut queue: VecDeque<Rc<RefCell<TaskScopeInner>>> = VecDeque::new();
for child in &inner.children {
queue.push_back(Rc::clone(&child.inner));
}
while let Some(scope_rc) = queue.pop_front() {
let scope = scope_rc.borrow();
for child in &scope.children {
queue.push_back(Rc::clone(&child.inner));
}
descendants.push(Rc::clone(&scope_rc));
}
}
for scope_rc in descendants.iter().rev() {
let mut scope = scope_rc.borrow_mut();
if scope.cancelled {
continue;
}
scope.cancelled = true;
scope.callbacks.borrow_mut().clear();
if !scope.task_ids.is_empty() {
let ex = Rc::clone(&scope.executor);
let dropped_futures: Vec<Pin<Box<dyn Future<Output = ()>>>> =
executor::cancel_scope_tasks_on(&ex, scope.id);
drop(dropped_futures);
}
scope.context.borrow_mut().clear();
unregister_scope(scope.id);
}
if !inner.task_ids.is_empty() {
let ex = Rc::clone(&inner.executor);
let dropped_futures = executor::cancel_scope_tasks_on(&ex, inner.id);
drop(dropped_futures);
}
inner.context.borrow_mut().clear();
inner.children.clear();
unregister_scope(inner.id);
}
}
#[macro_export]
macro_rules! provide_context {
($scope:expr, $value:expr) => {
$scope.provide($value)
};
}
#[macro_export]
macro_rules! consume_context {
($scope:expr, $ty:ty) => {
$scope.consume::<$ty>()
};
}
#[cfg(test)]
#[allow(clippy::items_after_statements)]
mod tests {
use super::*;
use crate::executor::{self, init_flush_scheduler, reset_executor_for_test, TestScheduleFlush};
use crate::{init_time_source, ScheduleFlush, TestTimeSource, TimeSource};
use auralis_signal::Signal;
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::time::Duration;
fn init() {
reset_executor_for_test();
init_flush_scheduler(Rc::new(TestScheduleFlush));
}
#[test]
fn new_scope_has_zero_tasks() {
let scope = TaskScope::new();
assert_eq!(scope.task_count(), 0);
assert_eq!(scope.child_count(), 0);
}
#[test]
fn new_child_attaches_to_parent() {
let parent = TaskScope::new();
let _child = TaskScope::new_child(&parent);
assert_eq!(parent.child_count(), 1);
}
#[test]
fn spawn_adds_task() {
init();
let scope = TaskScope::new();
scope.spawn(async {});
assert_eq!(scope.task_count(), 1);
}
#[test]
fn spawn_and_complete() {
init();
let done = Rc::new(Cell::new(false));
let done2 = Rc::clone(&done);
spawn_global(async move {
done2.set(true);
});
assert!(done.get());
}
#[test]
fn scope_spawn_and_cancel() {
init();
let dropped = Rc::new(Cell::new(false));
{
let scope = TaskScope::new();
let d = Rc::clone(&dropped);
struct DropCheck(Rc<Cell<bool>>);
impl Drop for DropCheck {
fn drop(&mut self) {
self.0.set(true);
}
}
scope.spawn(async move {
let _guard = DropCheck(d);
std::future::pending::<()>().await;
});
assert_eq!(executor::debug_task_count(), 1);
}
assert!(dropped.get());
assert_eq!(executor::debug_task_count(), 0);
}
#[test]
fn nested_scope_child_cancel_with_parent() {
init();
let dropped_child = Rc::new(Cell::new(false));
{
let parent = TaskScope::new();
let child = TaskScope::new_child(&parent);
let d = Rc::clone(&dropped_child);
struct DropCheck(Rc<Cell<bool>>);
impl Drop for DropCheck {
fn drop(&mut self) {
self.0.set(true);
}
}
child.spawn(async move {
let _guard = DropCheck(d);
std::future::pending::<()>().await;
});
assert_eq!(executor::debug_task_count(), 1);
}
assert!(dropped_child.get());
assert_eq!(executor::debug_task_count(), 0);
}
#[test]
fn deeply_nested_scope_drop_no_stack_overflow() {
init();
let root = TaskScope::new();
{
let mut current = TaskScope::new_child(&root);
for _ in 0..199 {
current = TaskScope::new_child(¤t);
}
}
drop(root);
assert_eq!(executor::debug_task_count(), 0);
}
#[test]
fn scope_child_explicit_tree() {
let root = TaskScope::new();
let a = TaskScope::new_child(&root);
let b = TaskScope::new_child(&root);
let _a1 = TaskScope::new_child(&a);
let _a2 = TaskScope::new_child(&a);
assert_eq!(root.child_count(), 2);
assert_eq!(a.child_count(), 2);
assert_eq!(b.child_count(), 0);
}
#[test]
fn callback_handle_dropped_before_tasks() {
init();
let dropped_order: Rc<RefCell<Vec<String>>> = Rc::new(RefCell::new(Vec::new()));
{
let scope = TaskScope::new();
let order1 = Rc::clone(&dropped_order);
scope.register_callback_handle(CallbackHandle::new(move || {
order1.borrow_mut().push("callback".to_string());
}));
let order2 = Rc::clone(&dropped_order);
struct DropCheck {
order: Rc<RefCell<Vec<String>>>,
label: String,
}
impl Drop for DropCheck {
fn drop(&mut self) {
self.order.borrow_mut().push(self.label.clone());
}
}
scope.spawn(async move {
let _guard = DropCheck {
order: order2,
label: "task".to_string(),
};
std::future::pending::<()>().await;
});
}
let order = dropped_order.borrow().clone();
assert_eq!(order, vec!["callback", "task"]);
}
#[test]
fn callback_handle_cleaned_up_on_child_scope_drop() {
init();
let called = Rc::new(Cell::new(false));
{
let parent = TaskScope::new();
let child = TaskScope::new_child(&parent);
let c = Rc::clone(&called);
child.register_callback_handle(CallbackHandle::new(move || {
c.set(true);
}));
}
assert!(called.get());
}
#[test]
fn context_provide_and_consume_in_same_scope() {
let scope = TaskScope::new();
scope.provide(42i32);
assert_eq!(*scope.consume::<i32>().unwrap(), 42);
}
#[test]
fn context_consume_walks_up_to_parent() {
let parent = TaskScope::new();
parent.provide("hello".to_string());
let child = TaskScope::new_child(&parent);
assert_eq!(*child.consume::<String>().unwrap(), "hello");
}
#[test]
fn context_consume_not_found() {
let scope = TaskScope::new();
assert!(scope.consume::<i32>().is_none());
}
#[test]
fn context_removed_on_scope_drop() {
let parent = TaskScope::new();
parent.provide(99u32);
{
let _child = TaskScope::new_child(&parent);
}
assert_eq!(*parent.consume::<u32>().unwrap(), 99);
}
#[test]
fn context_shadowing() {
let parent = TaskScope::new();
parent.provide(1i32);
let child = TaskScope::new_child(&parent);
child.provide(2i32);
assert_eq!(*child.consume::<i32>().unwrap(), 2);
assert_eq!(*parent.consume::<i32>().unwrap(), 1);
}
#[test]
#[should_panic(expected = "context not found")]
fn expect_context_panics_when_missing() {
let scope = TaskScope::new();
let _ = scope.expect_context::<String>();
}
#[test]
fn executor_priority_ordering() {
init();
let order = Rc::new(RefCell::new(Vec::new()));
let o1 = Rc::clone(&order);
executor::spawn_no_auto_flush(Priority::Low, async move {
o1.borrow_mut().push("low");
});
let o2 = Rc::clone(&order);
executor::spawn_no_auto_flush(Priority::High, async move {
o2.borrow_mut().push("high");
});
executor::flush_all();
let result = order.borrow().clone();
assert_eq!(result, vec!["high", "low"]);
}
#[test]
fn executor_batch() {
init();
let counter = Rc::new(Cell::new(0u32));
for _ in 0..10 {
let c = Rc::clone(&counter);
spawn_global(async move {
c.set(c.get() + 1);
});
}
assert_eq!(counter.get(), 10);
assert_eq!(executor::debug_task_count(), 0);
}
#[test]
fn no_leak_on_cancel() {
init();
for _ in 0..50 {
let scope = TaskScope::new();
for _ in 0..5 {
scope.spawn(std::future::pending::<()>());
}
}
assert_eq!(executor::debug_task_count(), 0);
}
#[test]
fn set_deferred_triggers_after_flush() {
use auralis_signal::Signal;
init();
let sig = Signal::new(0);
let observed = Rc::new(Cell::new(0));
set_deferred(&sig, 42);
assert_eq!(sig.read(), 42);
let ob1 = Rc::clone(&observed);
spawn_global(async move {
ob1.set(sig.read());
});
assert_eq!(observed.get(), 42);
}
#[test]
fn set_deferred_in_drop_safe() {
use auralis_signal::Signal;
init();
let sig = Signal::new(0);
struct SetOnDrop {
sig: Signal<i32>,
val: i32,
}
impl Drop for SetOnDrop {
fn drop(&mut self) {
set_deferred(&self.sig, self.val);
}
}
let guard = SetOnDrop {
sig: sig.clone(),
val: 99,
};
drop(guard);
assert_eq!(sig.read(), 99);
}
#[test]
fn set_deferred_from_drop_guard_during_scope_cancel() {
use auralis_signal::Signal;
init();
let sig = Signal::new(0i32);
struct ResetOnDrop {
sig: Signal<i32>,
}
impl Drop for ResetOnDrop {
fn drop(&mut self) {
set_deferred(&self.sig, 42);
}
}
{
let scope = TaskScope::new();
let s = sig.clone();
scope.spawn(async move {
let _guard = ResetOnDrop { sig: s };
std::future::pending::<()>().await;
});
}
assert_eq!(
sig.read(),
42,
"set_deferred should have fired after scope cancel"
);
}
#[test]
fn yield_now_gives_other_tasks_a_turn() {
init();
let order = Rc::new(RefCell::new(Vec::new()));
let o1 = Rc::clone(&order);
executor::spawn_no_auto_flush(Priority::Low, async move {
o1.borrow_mut().push("a1");
executor::yield_now().await;
o1.borrow_mut().push("a2");
});
let o2 = Rc::clone(&order);
executor::spawn_no_auto_flush(Priority::Low, async move {
o2.borrow_mut().push("b1");
o2.borrow_mut().push("b2");
});
executor::flush_all();
let r = order.borrow().clone();
assert_eq!(&r[0..3], &["a1", "b1", "b2"][..]);
assert!(r.contains(&"a2"));
}
#[test]
fn panic_in_task_is_isolated() {
init();
let survived = Rc::new(Cell::new(false));
let s = Rc::clone(&survived);
spawn_global(async move {
panic!("intentional test panic");
});
spawn_global(async move {
s.set(true);
});
assert!(survived.get());
assert_eq!(executor::debug_task_count(), 0);
}
#[test]
fn time_budget_with_test_time_source() {
init();
let ts = Rc::new(TestTimeSource::new(0));
init_time_source(ts.clone());
let polled = Rc::new(Cell::new(0u32));
for _ in 0..50 {
let pc = Rc::clone(&polled);
let ts_c = Rc::clone(&ts);
executor::spawn_no_auto_flush(Priority::Low, async move {
pc.set(pc.get() + 1);
ts_c.advance(1);
});
}
executor::flush_all();
assert_eq!(polled.get(), 50);
assert_eq!(executor::debug_task_count(), 0);
}
#[test]
fn time_budget_honoured_with_split() {
let schedule_count = Rc::new(Cell::new(0u32));
struct NoopScheduleFlush(Rc<Cell<u32>>);
impl ScheduleFlush for NoopScheduleFlush {
fn schedule(&self, _callback: Box<dyn FnOnce()>) {
self.0.set(self.0.get() + 1);
}
}
init_flush_scheduler(Rc::new(NoopScheduleFlush(Rc::clone(&schedule_count))));
let ts = Rc::new(TestTimeSource::new(0));
init_time_source(ts.clone());
let polled = Rc::new(RefCell::new(Vec::new()));
for i in 0..50u32 {
let pc = Rc::clone(&polled);
let ts_c = Rc::clone(&ts);
executor::spawn_no_auto_flush(Priority::Low, async move {
pc.borrow_mut().push(i);
ts_c.advance(1);
});
}
executor::flush_all();
let completed = polled.borrow().len();
assert!(
completed < 50,
"budget should split before all tasks run (only {completed} of 50)"
);
assert!(
completed >= 7,
"at least 7 tasks should run before budget expires ({completed})"
);
assert_eq!(
schedule_count.get(),
1,
"next flush should have been scheduled exactly once"
);
init_flush_scheduler(Rc::new(TestScheduleFlush));
executor::flush_all();
assert_eq!(executor::debug_task_count(), 0);
}
#[test]
fn provide_context_macro_works() {
let scope = TaskScope::new();
provide_context!(scope, 42i32);
assert_eq!(*scope.consume::<i32>().unwrap(), 42);
}
#[test]
fn consume_context_macro_works() {
let scope = TaskScope::new();
scope.provide(99u32);
let val: Option<Rc<u32>> = consume_context!(scope, u32);
assert_eq!(*val.unwrap(), 99);
}
#[test]
fn consume_context_macro_not_found() {
let scope = TaskScope::new();
let val: Option<Rc<String>> = consume_context!(scope, String);
assert!(val.is_none());
}
#[cfg(feature = "debug")]
#[test]
fn dump_task_tree_returns_string() {
init();
let scope = TaskScope::new();
scope.spawn(async { std::future::pending::<()>().await });
let output = crate::dump_task_tree();
assert!(output.contains("Auralis Task Tree"));
assert!(output.contains("Total active tasks: 1"));
assert!(output.contains("Scope"));
}
#[cfg(feature = "debug")]
#[test]
fn dump_task_tree_empty() {
init();
let output = crate::dump_task_tree();
assert!(output.contains("(no active tasks)"));
}
use crate::{set_deferred, spawn_global};
#[test]
fn suspend_prevents_task_execution() {
init();
let scope = TaskScope::new();
let executed = Rc::new(Cell::new(false));
let ex = Rc::clone(&executed);
scope.spawn(async move {
ex.set(true);
});
assert!(executed.get());
executed.set(false);
scope.suspend();
let ex2 = Rc::clone(&executed);
scope.spawn(async move {
ex2.set(true);
});
assert!(!executed.get());
}
#[test]
fn resume_allows_task_execution() {
init();
let scope = TaskScope::new();
scope.suspend();
let executed = Rc::new(Cell::new(false));
let ex = Rc::clone(&executed);
scope.spawn(async move {
ex.set(true);
});
assert!(!executed.get());
scope.resume();
assert!(executed.get());
}
#[test]
fn suspend_cascades_to_children() {
init();
let parent = TaskScope::new();
let child = TaskScope::new_child(&parent);
assert!(!child.is_suspended());
parent.suspend();
assert!(parent.is_suspended());
assert!(child.is_suspended());
}
#[test]
fn resume_cascades_to_children() {
init();
let parent = TaskScope::new();
let child = TaskScope::new_child(&parent);
parent.suspend();
assert!(child.is_suspended());
parent.resume();
assert!(!parent.is_suspended());
assert!(!child.is_suspended());
}
#[test]
fn multiple_suspend_resume_no_leak() {
init();
let scope = TaskScope::new();
for _ in 0..50 {
scope.suspend();
assert!(scope.is_suspended());
scope.resume();
assert!(!scope.is_suspended());
}
}
#[test]
fn suspended_scope_drops_without_panic() {
init();
{
let scope = TaskScope::new();
scope.suspend();
let d = Rc::new(Cell::new(false));
struct DropCheck(Rc<Cell<bool>>);
impl Drop for DropCheck {
fn drop(&mut self) {
self.0.set(true);
}
}
scope.spawn(async move {
let _guard = DropCheck(d);
std::future::pending::<()>().await;
});
}
assert_eq!(executor::debug_task_count(), 0);
}
#[test]
fn siblings_not_affected_by_suspend() {
init();
let parent = TaskScope::new();
let child_a = TaskScope::new_child(&parent);
let child_b = TaskScope::new_child(&parent);
child_a.suspend();
assert!(child_a.is_suspended());
assert!(!child_b.is_suspended());
assert!(!parent.is_suspended());
}
use crate::Executor;
#[test]
fn flush_instance_panicking_task_is_isolated() {
init();
let ex = Executor::new_instance();
Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
let survived = Rc::new(Cell::new(false));
let s = Rc::clone(&survived);
Executor::spawn(&ex, async move {
panic!("intentional test panic in instance executor");
});
Executor::spawn(&ex, async move {
s.set(true);
});
Executor::flush_instance(&ex);
assert!(survived.get());
}
#[test]
fn flush_instance_spawn_and_complete() {
init();
let ex = Executor::new_instance();
Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
let counter = Rc::new(Cell::new(0u32));
for _ in 0..20 {
let c = Rc::clone(&counter);
Executor::spawn(&ex, async move {
c.set(c.get() + 1);
});
}
Executor::flush_instance(&ex);
assert_eq!(counter.get(), 20);
}
use crate::timer;
#[test]
fn timer_zero_duration_completes_immediately() {
init();
let done = Rc::new(Cell::new(false));
let d = Rc::clone(&done);
spawn_global(async move {
timer::sleep(Duration::ZERO).await;
d.set(true);
});
assert!(done.get());
}
#[test]
fn timer_normal_delay_fires_after_time_advances() {
init();
let ts = Rc::new(TestTimeSource::new(0));
init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
let done = Rc::new(Cell::new(false));
let d = Rc::clone(&done);
spawn_global(async move {
timer::sleep(Duration::from_millis(100)).await;
d.set(true);
});
assert!(!done.get());
ts.advance(150);
crate::executor::flush_all();
assert!(done.get());
}
#[test]
fn timer_across_multiple_flushes() {
init();
let ts = Rc::new(TestTimeSource::new(0));
init_time_source(Rc::clone(&ts) as Rc<dyn TimeSource>);
let counter = Rc::new(Cell::new(0u32));
let c = Rc::clone(&counter);
spawn_global(async move {
for _ in 0..3 {
timer::sleep(Duration::from_millis(100)).await;
c.set(c.get() + 1);
}
});
assert_eq!(counter.get(), 0);
ts.advance(100);
crate::executor::flush_all();
assert_eq!(counter.get(), 1);
ts.advance(100);
crate::executor::flush_all();
assert_eq!(counter.get(), 2);
ts.advance(100);
crate::executor::flush_all();
assert_eq!(counter.get(), 3);
}
#[test]
fn timer_cancelled_by_scope_drop() {
init();
let executed = Rc::new(Cell::new(false));
let ex = Rc::clone(&executed);
{
let scope = TaskScope::new();
scope.spawn(async move {
timer::sleep(Duration::from_millis(500)).await;
ex.set(true);
});
}
assert!(!executed.get());
assert_eq!(executor::debug_task_count(), 0);
}
#[test]
fn reentrant_flush_is_noop() {
init();
let reentered = Rc::new(Cell::new(false));
let r = Rc::clone(&reentered);
let sig = Signal::new(0);
auralis_signal::subscribe(&sig, Rc::new(move || r.set(true)));
sig.set(1);
assert!(reentered.get());
}
#[test]
fn instance_executor_timer() {
init();
let ex = Executor::new_instance();
Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
let ts = Rc::new(TestTimeSource::new(0));
Executor::install_time_source(&ex, Rc::clone(&ts) as Rc<dyn TimeSource>);
let done = Rc::new(Cell::new(false));
let d = Rc::clone(&done);
Executor::spawn(&ex, async move {
timer::sleep(Duration::from_millis(50)).await;
d.set(true);
});
assert!(!done.get());
ts.advance(60);
Executor::flush_instance(&ex);
assert!(done.get());
}
#[test]
fn set_deferred_routes_to_instance_executor() {
init();
let ex = Executor::new_instance();
Executor::install_flush_scheduler(&ex, Rc::new(TestScheduleFlush));
let sig = Signal::new(0);
let s = sig.clone();
Executor::spawn(&ex, async move {
crate::set_deferred(&s, 42);
});
Executor::flush_instance(&ex);
assert_eq!(sig.read(), 42);
}
#[test]
fn panic_hook_is_invoked_on_task_panic() {
init();
let hook_called = Rc::new(Cell::new(false));
let hc = Rc::clone(&hook_called);
crate::set_panic_hook(Rc::new(move |_info| {
hc.set(true);
}));
let scope = TaskScope::new();
scope.spawn(async move { panic!("intentional") });
assert!(hook_called.get());
}
#[test]
fn current_scope_available_in_spawned_task() {
init();
let scope = TaskScope::new();
let found = Rc::new(Cell::new(false));
let f = Rc::clone(&found);
scope.spawn(async move {
f.set(crate::current_scope().is_some());
});
assert!(found.get());
}
#[test]
fn callback_handle_noop_does_not_panic() {
let _h = crate::CallbackHandle::noop();
}
#[test]
fn sync_callback_fallback_without_schedule_hook() {
crate::reset_executor_for_test();
let sig = Signal::new(0);
let called = Rc::new(Cell::new(false));
let c = Rc::clone(&called);
auralis_signal::subscribe(&sig, Rc::new(move || c.set(true)));
sig.set(1);
assert!(called.get());
}
#[test]
fn set_deferred_isolated_to_instance_executor() {
init();
let ex1 = Executor::new_instance();
Executor::install_flush_scheduler(&ex1, Rc::new(TestScheduleFlush));
let ex2 = Executor::new_instance();
Executor::install_flush_scheduler(&ex2, Rc::new(TestScheduleFlush));
let sig1 = Signal::new(0);
let sig2 = Signal::new(0);
let s1 = sig1.clone();
crate::with_executor(&ex1, || {
crate::set_deferred(&s1, 42);
});
Executor::flush_instance(&ex1);
assert_eq!(sig1.read(), 42);
assert_eq!(sig2.read(), 0);
}
#[test]
fn notify_signal_state_follow_up_handles_reentrant_dirty() {
let sig = Signal::new(0);
let sig2 = sig.clone();
let count = Rc::new(Cell::new(0u32));
let c = Rc::clone(&count);
auralis_signal::subscribe(
&sig,
Rc::new(move || {
c.set(c.get() + 1);
if c.get() == 1 {
sig2.set(2);
}
}),
);
sig.set(1);
assert_eq!(sig.read(), 2);
assert_eq!(count.get(), 2);
}
}