use alloc::{string::String, vec::Vec};
use bevy_platform::sync::Arc;
use core::{cell::{RefCell, Cell}, future::Future, marker::PhantomData, mem};
use crate::executor::LocalExecutor;
use crate::{block_on, Task};
crate::cfg::std! {
if {
use std::thread_local;
use crate::executor::LocalExecutor as Executor;
thread_local! {
static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() };
}
} else {
use crate::executor::Executor;
static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() };
}
}
#[derive(Debug, Default, Clone)]
pub struct TaskPoolBuilder {}
#[derive(Default)]
pub struct ThreadExecutor<'a>(PhantomData<&'a ()>);
impl<'a> ThreadExecutor<'a> {
pub fn new() -> Self {
Self::default()
}
}
impl TaskPoolBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn num_threads(self, _num_threads: usize) -> Self {
self
}
pub fn stack_size(self, _stack_size: usize) -> Self {
self
}
pub fn thread_name(self, _thread_name: String) -> Self {
self
}
pub fn on_thread_spawn(self, _f: impl Fn() + Send + Sync + 'static) -> Self {
self
}
pub fn on_thread_destroy(self, _f: impl Fn() + Send + Sync + 'static) -> Self {
self
}
pub fn build(self) -> TaskPool {
TaskPool::new_internal()
}
}
#[derive(Debug, Default, Clone)]
pub struct TaskPool {}
impl TaskPool {
pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
Arc::new(ThreadExecutor::new())
}
pub fn new() -> Self {
TaskPoolBuilder::new().build()
}
fn new_internal() -> Self {
Self {}
}
pub fn thread_num(&self) -> usize {
1
}
pub fn scope<'env, F, T>(&self, f: F) -> Vec<T>
where
F: for<'scope> FnOnce(&'scope mut Scope<'scope, 'env, T>),
T: Send + 'static,
{
self.scope_with_executor(false, None, f)
}
#[expect(unsafe_code, reason = "Required to transmute lifetimes.")]
pub fn scope_with_executor<'env, F, T>(
&self,
_tick_task_pool_executor: bool,
_thread_executor: Option<&ThreadExecutor>,
f: F,
) -> Vec<T>
where
F: for<'scope> FnOnce(&'scope mut Scope<'scope, 'env, T>),
T: Send + 'static,
{
let executor = LocalExecutor::new();
let executor_ref: &'env LocalExecutor<'env> = unsafe { mem::transmute(&executor) };
let results: RefCell<Vec<Option<T>>> = RefCell::new(Vec::new());
let results_ref: &'env RefCell<Vec<Option<T>>> = unsafe { mem::transmute(&results) };
let pending_tasks: Cell<usize> = Cell::new(0);
let pending_tasks: &'env Cell<usize> = unsafe { mem::transmute(&pending_tasks) };
let mut scope = Scope {
executor_ref,
pending_tasks,
results_ref,
scope: PhantomData,
env: PhantomData,
};
let scope_ref: &'env mut Scope<'_, 'env, T> = unsafe { mem::transmute(&mut scope) };
f(scope_ref);
block_on(executor.run(async {
while pending_tasks.get() != 0 {
futures_lite::future::yield_now().await;
}
}));
results
.take()
.into_iter()
.map(|result| result.unwrap())
.collect()
}
pub fn spawn<T>(
&self,
future: impl Future<Output = T> + 'static + MaybeSend + MaybeSync,
) -> Task<T>
where
T: 'static + MaybeSend + MaybeSync,
{
crate::cfg::switch! {{
crate::cfg::web => {
web_task::spawn_local(future)
}
crate::cfg::std => {
LOCAL_EXECUTOR.with(|executor| {
let task = executor.spawn(future);
while executor.try_tick() {}
task
})
}
_ => {
let task = LOCAL_EXECUTOR.spawn(future);
while LOCAL_EXECUTOR.try_tick() {}
task
}
}}
}
pub fn spawn_local<T>(
&self,
future: impl Future<Output = T> + 'static + MaybeSend + MaybeSync,
) -> Task<T>
where
T: 'static + MaybeSend + MaybeSync,
{
self.spawn(future)
}
pub fn with_local_executor<F, R>(&self, f: F) -> R
where
F: FnOnce(&Executor) -> R,
{
crate::cfg::switch! {{
crate::cfg::std => {
LOCAL_EXECUTOR.with(f)
}
_ => {
f(&LOCAL_EXECUTOR)
}
}}
}
}
#[derive(Debug)]
pub struct Scope<'scope, 'env: 'scope, T> {
executor_ref: &'scope LocalExecutor<'scope>,
pending_tasks: &'scope Cell<usize>,
results_ref: &'env RefCell<Vec<Option<T>>>,
scope: PhantomData<&'scope mut &'scope ()>,
env: PhantomData<&'env mut &'env ()>,
}
impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
pub fn spawn<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
self.spawn_on_scope(f);
}
pub fn spawn_on_external<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
self.spawn_on_scope(f);
}
pub fn spawn_on_scope<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
let pending_tasks = self.pending_tasks;
pending_tasks.update(|i| i + 1);
let results_ref = self.results_ref;
let mut results = results_ref.borrow_mut();
let task_number = results.len();
results.push(None);
drop(results);
let f = async move {
let result = f.await;
let mut results = results_ref.borrow_mut();
results[task_number] = Some(result);
drop(results);
pending_tasks.update(|i| i - 1);
};
self.executor_ref.spawn(f).detach();
}
}
crate::cfg::std! {
if {
pub trait MaybeSend {}
impl<T> MaybeSend for T {}
pub trait MaybeSync {}
impl<T> MaybeSync for T {}
} else {
pub trait MaybeSend: Send {}
impl<T: Send> MaybeSend for T {}
pub trait MaybeSync: Sync {}
impl<T: Sync> MaybeSync for T {}
}
}
#[cfg(test)]
mod test {
use std::{time, thread};
use super::*;
#[test]
fn scoped_spawn() {
let (sender, receiver) = async_channel::unbounded();
let task_pool = TaskPool {};
let thread = thread::spawn(move || {
let duration = time::Duration::from_millis(50);
thread::sleep(duration);
let _ = sender.send(0);
});
task_pool.scope(|scope| {
scope.spawn(async {
receiver.recv().await
});
});
}
}