use std::sync::Arc;
use std::vec::IntoIter;
use std::time::Duration;
use std::future::Future;
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::io::{Error, ErrorKind, Result};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::task::{Context, Poll, Waker};
use std::thread::{self, Builder};
use async_stream::stream;
use crossbeam_channel::{bounded, Sender};
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use crossbeam_queue::{ArrayQueue, SegQueue};
use st3::{StealError,
fifo::{Worker as FIFOWorker, Stealer as FIFOStealer}};
use flume::bounded as async_bounded;
use futures::{
future::{BoxFuture, FutureExt},
stream::{BoxStream, Stream, StreamExt},
task::waker_ref,
TryFuture,
};
use parking_lot::{Condvar, Mutex};
use rand::{Rng, thread_rng};
use num_cpus;
use wrr::IWRRSelector;
use quanta::{Clock, Instant as QInstant};
use log::warn;
use super::{
PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME, PI_ASYNC_THREAD_LOCAL_ID, DEFAULT_MAX_HIGH_PRIORITY_BOUNDED, DEFAULT_HIGH_PRIORITY_BOUNDED, DEFAULT_MAX_LOW_PRIORITY_BOUNDED, alloc_rt_uid, local_async_runtime, AsyncMapReduce, AsyncPipelineResult, AsyncRuntime,
AsyncRuntimeExt, AsyncTask, AsyncTaskPool, AsyncTaskPoolExt, AsyncTaskTimerByNotCancel, AsyncTimingTask,
AsyncWait, AsyncWaitAny, AsyncWaitAnyCallback, AsyncWaitTimeout, LocalAsyncWaitTimeout, LocalAsyncRuntime, TaskId, TaskHandle, YieldNow
};
#[cfg(not(target_arch = "wasm32"))]
const DEFAULT_INIT_WORKER_SIZE: usize = 2;
#[cfg(target_arch = "wasm32")]
const DEFAULT_INIT_WORKER_SIZE: usize = 1;
const DEFAULT_WORKER_THREAD_PREFIX: &str = "Default-Multi-RT";
const DEFAULT_THREAD_STACK_SIZE: usize = 1024 * 1024;
const DEFAULT_WORKER_THREAD_SLEEP_TIME: u64 = 10;
const DEFAULT_RUNTIME_SLEEP_TIME: u64 = 1000;
const DEFAULT_MAX_WEIGHT: u8 = 254;
const DEFAULT_MIN_WEIGHT: u8 = 1;
struct ComputationalTaskQueue<O: Default + 'static> {
stack: Worker<Arc<AsyncTask<ComputationalTaskPool<O>, O>>>, queue: SegQueue<Arc<AsyncTask<ComputationalTaskPool<O>, O>>>, thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>, }
impl<O: Default + 'static> ComputationalTaskQueue<O> {
pub fn new(thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) -> Self {
let stack = Worker::new_lifo();
let queue = SegQueue::new();
ComputationalTaskQueue {
stack,
queue,
thread_waker,
}
}
pub fn len(&self) -> usize {
self.stack.len() + self.queue.len()
}
}
pub struct ComputationalTaskPool<O: Default + 'static> {
workers: Vec<ComputationalTaskQueue<O>>, waits: Option<Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>>, consume_count: Arc<AtomicUsize>, produce_count: Arc<AtomicUsize>, }
unsafe impl<O: Default + 'static> Send for ComputationalTaskPool<O> {}
unsafe impl<O: Default + 'static> Sync for ComputationalTaskPool<O> {}
impl<O: Default + 'static> Default for ComputationalTaskPool<O> {
fn default() -> Self {
#[cfg(not(target_arch = "wasm32"))]
let core_len = num_cpus::get(); #[cfg(target_arch = "wasm32")]
let core_len = 1; ComputationalTaskPool::new(core_len)
}
}
impl<O: Default + 'static> AsyncTaskPool<O> for ComputationalTaskPool<O> {
type Pool = ComputationalTaskPool<O>;
#[inline]
fn get_thread_id(&self) -> usize {
match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| unsafe { *thread_id.get() }) {
Err(e) => {
panic!(
"Get thread id failed, thread: {:?}, reason: {:?}",
thread::current(),
e
);
}
Ok(id) => id,
}
}
#[inline]
fn len(&self) -> usize {
if let Some(len) = self
.produce_count
.load(Ordering::Relaxed)
.checked_sub(self.consume_count.load(Ordering::Relaxed))
{
len
} else {
0
}
}
#[inline]
fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
let index = self.produce_count.fetch_add(1, Ordering::Relaxed) % self.workers.len();
self.workers[index].queue.push(task);
Ok(())
}
#[inline]
fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
let id = self.get_thread_id();
let rt_uid = task.owner();
if (id >> 32) == rt_uid {
let worker = &self.workers[id & 0xffffffff];
worker.queue.push(task);
self.produce_count.fetch_add(1, Ordering::Relaxed);
Ok(())
} else {
self.push(task)
}
}
#[inline]
fn push_priority(&self,
priority: usize,
task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
if priority >= DEFAULT_MAX_HIGH_PRIORITY_BOUNDED {
let id = self.get_thread_id();
let rt_uid = task.owner();
if (id >> 32) == rt_uid {
let worker = &self.workers[id & 0xffffffff];
worker.stack.push(task);
self.produce_count.fetch_add(1, Ordering::Relaxed);
Ok(())
} else {
self.push(task)
}
} else if priority >= DEFAULT_HIGH_PRIORITY_BOUNDED {
self.push_local(task)
} else {
self.push(task)
}
}
#[inline]
fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
self.push_priority(DEFAULT_HIGH_PRIORITY_BOUNDED, task)
}
#[inline]
fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
let id = self.get_thread_id() & 0xffffffff;
let worker = &self.workers[id];
let task = worker.stack.pop();
if task.is_some() {
self.consume_count.fetch_add(1, Ordering::Relaxed);
return task;
}
let task = worker.queue.pop();
if task.is_some() {
self.consume_count.fetch_add(1, Ordering::Relaxed);
}
task
}
#[inline]
fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
let mut tasks = Vec::with_capacity(self.len());
while let Some(task) = self.try_pop() {
tasks.push(task);
}
tasks.into_iter()
}
#[inline]
fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
None
}
}
impl<O: Default + 'static> AsyncTaskPoolExt<O> for ComputationalTaskPool<O> {
#[inline]
fn set_waits(&mut self, waits: Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>) {
self.waits = Some(waits);
}
#[inline]
fn get_waits(&self) -> Option<&Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>> {
self.waits.as_ref()
}
#[inline]
fn worker_len(&self) -> usize {
self.workers.len()
}
#[inline]
fn clone_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
let worker = &self.workers[self.get_thread_id() & 0xffffffff];
Some(worker.thread_waker.clone())
}
}
impl<O: Default + 'static> ComputationalTaskPool<O> {
pub fn new(mut size: usize) -> Self {
if size < DEFAULT_INIT_WORKER_SIZE {
size = DEFAULT_INIT_WORKER_SIZE;
}
let mut workers = Vec::with_capacity(size);
for _ in 0..size {
let thread_waker = Arc::new((AtomicBool::new(false), Mutex::new(()), Condvar::new()));
let worker = ComputationalTaskQueue::new(thread_waker);
workers.push(worker);
}
let consume_count = Arc::new(AtomicUsize::new(0));
let produce_count = Arc::new(AtomicUsize::new(0));
ComputationalTaskPool {
workers,
waits: None,
consume_count,
produce_count,
}
}
}
struct StealableTaskQueue<O: Default + 'static> {
stack: UnsafeCell<Option<Arc<AsyncTask<StealableTaskPool<O>, O>>>>, internal: FIFOWorker<Arc<AsyncTask<StealableTaskPool<O>, O>>>, external: Worker<Arc<AsyncTask<StealableTaskPool<O>, O>>>, selector: UnsafeCell<IWRRSelector<2>>, thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>, }
impl<O: Default + 'static> StealableTaskQueue<O> {
pub fn new(
init_queue_capacity: usize,
thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>,
) -> (Self,
FIFOStealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>,
Stealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>) {
let stack = UnsafeCell::new(None);
let internal = FIFOWorker::new(init_queue_capacity);
let external = Worker::new_fifo();
let internal_stealer = internal.stealer();
let external_stealer = external.stealer();
let selector = UnsafeCell::new(IWRRSelector::new([2, 1]));
(
StealableTaskQueue {
stack,
internal,
external,
selector,
thread_waker,
},
internal_stealer,
external_stealer
)
}
pub const fn stack_capacity(&self) -> usize {
1
}
pub fn internal_capacity(&self) -> usize {
self.internal.capacity()
}
pub fn remaining_internal_capacity(&self) -> usize {
self.internal.spare_capacity()
}
#[inline]
pub fn stack_len(&self) -> usize {
unsafe {
if (&*self.stack.get()).is_some() {
1
} else {
0
}
}
}
pub fn internal_len(&self) -> usize {
self
.internal_capacity()
.checked_sub(self.remaining_internal_capacity())
.unwrap_or(0)
}
pub fn external_len(&self) -> usize {
self.external.len()
}
}
pub struct StealableTaskPool<O: Default + 'static> {
public: Injector<Arc<AsyncTask<StealableTaskPool<O>, O>>>, workers: Vec<StealableTaskQueue<O>>, internal_stealers: Vec<FIFOStealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>>, external_stealers: Vec<Stealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>>, internal_consume: AtomicUsize, internal_produce: AtomicUsize, internal_traffic_statistics: AtomicUsize, external_consume: AtomicUsize, external_produce: AtomicUsize, external_traffic_statistics: AtomicUsize, weights: [u8; 2], clock: Clock, interval: usize, last_time: UnsafeCell<QInstant>, }
unsafe impl<O: Default + 'static> Send for StealableTaskPool<O> {}
unsafe impl<O: Default + 'static> Sync for StealableTaskPool<O> {}
impl<O: Default + 'static> Default for StealableTaskPool<O> {
fn default() -> Self {
StealableTaskPool::new()
}
}
impl<O: Default + 'static> AsyncTaskPool<O> for StealableTaskPool<O> {
type Pool = StealableTaskPool<O>;
#[inline]
fn get_thread_id(&self) -> usize {
match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| unsafe { *thread_id.get() }) {
Err(e) => {
panic!(
"Get thread id failed, thread: {:?}, reason: {:?}",
thread::current(),
e
);
}
Ok(id) => id,
}
}
#[inline]
fn len(&self) -> usize {
self.internal_produce
.load(Ordering::Relaxed)
.checked_sub(self.internal_consume.load(Ordering::Relaxed))
.unwrap_or(0)
+
self.external_produce
.load(Ordering::Relaxed)
.checked_sub(self.external_consume.load(Ordering::Relaxed))
.unwrap_or(0)
}
#[inline]
fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
self.public.push(task);
self
.external_produce
.fetch_add(1, Ordering::Relaxed);
Ok(())
}
#[inline]
fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
let id = self.get_thread_id();
let rt_uid = task.owner();
if (id >> 32) == rt_uid {
let worker = &self.workers[id & 0xffffffff];
if worker.remaining_internal_capacity() > 0 {
let _ = worker.internal.push(task);
self
.internal_produce
.fetch_add(1, Ordering::Relaxed);
Ok(())
} else {
self.push(task)
}
} else {
self.push(task)
}
}
#[inline]
fn push_priority(&self,
priority: usize,
task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
if priority >= DEFAULT_MAX_HIGH_PRIORITY_BOUNDED {
let id = self.get_thread_id();
let rt_uid = task.owner();
if (id >> 32) == rt_uid {
let worker = &self.workers[id & 0xffffffff];
if worker.stack_len() < 1 {
unsafe {
*worker.stack.get() = Some(task);
}
} else if worker.remaining_internal_capacity() > 0 {
let _ = worker.internal.push(task);
} else {
return self.push(task);
}
self
.internal_produce
.fetch_add(1, Ordering::Relaxed);
Ok(())
} else {
self.push(task)
}
} else if priority >= DEFAULT_HIGH_PRIORITY_BOUNDED {
self.push_local(task)
} else {
self.push(task)
}
}
#[inline]
fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
self.push_priority(DEFAULT_HIGH_PRIORITY_BOUNDED, task)
}
#[inline]
fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
let id = self.get_thread_id() & 0xffffffff;
let worker = &self.workers[id];
let task = unsafe { (&mut *worker
.stack
.get())
.take()
};
if task.is_some() {
return task;
}
try_pop_by_weight(self, worker, id)
}
#[inline]
fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
let mut tasks = Vec::with_capacity(self.len());
while let Some(task) = self.try_pop() {
tasks.push(task);
}
tasks.into_iter()
}
#[inline]
fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
None
}
}
const fn get_msb(n: usize) -> usize {
usize::BITS as usize - n.leading_zeros() as usize
}
fn try_pop_by_weight<O: Default + 'static>(pool: &StealableTaskPool<O>,
local_worker: &StealableTaskQueue<O>,
local_worker_id: usize)
-> Option<Arc<AsyncTask<StealableTaskPool<O>, O>>> {
unsafe {
let duration = pool
.clock
.recent()
.duration_since(*pool.last_time.get())
.as_millis() as usize;
if duration >= pool.interval {
let new_external_traffic_statistics = pool
.external_produce
.load(Ordering::Relaxed);
let new_internal_traffic_statistics = pool
.internal_produce
.load(Ordering::Relaxed);
let external_delta = if new_external_traffic_statistics == 0 {
1
} else {
new_external_traffic_statistics
.checked_sub(pool
.external_traffic_statistics
.load(Ordering::Relaxed))
.unwrap_or(1)
};
pool
.external_traffic_statistics
.store(new_external_traffic_statistics, Ordering::Relaxed); let internal_delta = if new_internal_traffic_statistics == 0 {
1
} else {
new_internal_traffic_statistics
.checked_sub(pool
.internal_traffic_statistics
.load(Ordering::Relaxed))
.unwrap_or(1)
};
pool
.internal_traffic_statistics
.store(new_internal_traffic_statistics, Ordering::Relaxed);
let selector = &mut *local_worker.selector.get();
if external_delta > internal_delta {
let msb = get_msb(internal_delta);
let internal_weight
= (internal_delta >> msb.checked_sub(2).unwrap_or(0)).max(1);
let external_weight
= ((external_delta >> msb).min(DEFAULT_MAX_WEIGHT as usize)).max(1);
selector.change_weight(0, external_weight as u8);
selector.change_weight(1, internal_weight as u8);
} else if external_delta < internal_delta {
let msb = get_msb(external_delta);
let external_weight
= (external_delta >> msb.checked_sub(2).unwrap_or(0)).max(1);
let internal_weight
= ((internal_delta >> msb).min(DEFAULT_MAX_WEIGHT as usize)).max(1);
selector.change_weight(0, external_weight as u8);
selector.change_weight(1, internal_weight as u8);
} else {
selector.change_weight(0, 1);
selector.change_weight(1, 1);
}
*pool.last_time.get() = pool.clock.recent(); }
match (&mut *local_worker.selector.get()).select() {
0 => {
let task = try_pop_external(pool, local_worker, local_worker_id);
if task.is_some() {
task
} else {
try_pop_internal(pool, local_worker, local_worker_id)
}
},
_ => {
let task = try_pop_internal(pool, local_worker, local_worker_id);
if task.is_some() {
task
} else {
try_pop_external(pool, local_worker, local_worker_id)
}
},
}
}
}
#[inline]
fn try_pop_internal<O: Default + 'static>(pool: &StealableTaskPool<O>,
local_worker: &StealableTaskQueue<O>,
local_worker_id: usize)
-> Option<Arc<AsyncTask<StealableTaskPool<O>, O>>> {
let task = local_worker
.internal
.pop();
if task.is_some() {
pool
.internal_consume
.fetch_add(1, Ordering::Relaxed);
task
} else {
let mut gen = thread_rng();
let mut worker_stealers: Vec<&FIFOStealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>> = pool
.internal_stealers
.iter()
.enumerate()
.filter_map(|(index, other)| {
if index != local_worker_id {
Some(other)
} else {
None
}
})
.collect();
let remaining_len = local_worker.remaining_internal_capacity();
loop {
if worker_stealers.len() == 0 {
break;
}
let index = gen.gen_range(0..worker_stealers.len());
let worker_stealer = worker_stealers.swap_remove(index);
match worker_stealer.steal_and_pop(&local_worker.internal,
|count| {
let stealable_len = count / 2;
if stealable_len <= remaining_len {
if stealable_len == 0 {
1
} else {
stealable_len
}
} else {
remaining_len
}
}) {
Err(StealError::Empty) => {
continue;
},
Err(StealError::Busy) => {
continue;
},
Ok((task, _)) => {
pool.internal_consume.fetch_add(1, Ordering::Relaxed);
return Some(task);
},
}
}
None
}
}
#[inline]
fn try_pop_external<O: Default + 'static>(pool: &StealableTaskPool<O>,
local_worker: &StealableTaskQueue<O>,
local_worker_id: usize)
-> Option<Arc<AsyncTask<StealableTaskPool<O>, O>>> {
let task = local_worker
.external
.pop();
if task.is_some() {
pool
.external_consume
.fetch_add(1, Ordering::Relaxed);
task
} else {
let task = try_pop_public(pool, local_worker);
if task.is_some() {
pool
.external_consume
.fetch_add(1, Ordering::Relaxed);
task
} else {
let mut gen = thread_rng();
let mut worker_stealers: Vec<&Stealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>> = pool
.external_stealers
.iter()
.enumerate()
.filter_map(|(index, other)| {
if index != local_worker_id {
Some(other)
} else {
None
}
})
.collect();
loop {
if worker_stealers.len() == 0 {
break;
}
let index = gen.gen_range(0..worker_stealers.len());
let worker_stealer = worker_stealers.swap_remove(index);
match worker_stealer.steal_batch_and_pop(&local_worker.external) {
Steal::Success(task) => {
pool.external_consume.fetch_add(1, Ordering::Relaxed);
return Some(task);
},
Steal::Retry => {
continue;
},
Steal::Empty => {
continue;
},
}
}
None
}
}
}
#[inline]
fn try_pop_public<O: Default + 'static>(pool: &StealableTaskPool<O>,
local_worker: &StealableTaskQueue<O>)
-> Option<Arc<AsyncTask<StealableTaskPool<O>, O>>> {
loop {
match pool.public.steal_batch_and_pop(&local_worker.external) {
Steal::Empty => {
return None;
},
Steal::Retry => {
continue;
},
Steal::Success(task) => {
pool.external_consume.fetch_add(1, Ordering::Relaxed);
return Some(task);
},
}
}
}
impl<O: Default + 'static> AsyncTaskPoolExt<O> for StealableTaskPool<O> {
#[inline]
fn worker_len(&self) -> usize {
self.workers.len()
}
#[inline]
fn clone_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
if let Some(worker) = self.workers.get(self.get_thread_id() & 0xffffffff) {
return Some(worker.thread_waker.clone());
}
None
}
}
impl<O: Default + 'static> StealableTaskPool<O> {
pub fn new() -> Self {
#[cfg(not(target_arch = "wasm32"))]
let size = num_cpus::get_physical() * 2; #[cfg(target_arch = "wasm32")]
let size = 1; StealableTaskPool::with(size,
0x8000,
[1, 1],
3000)
}
pub fn with(worker_size: usize,
internal_queue_capacity: usize,
weights: [u8; 2],
interval: usize) -> Self {
if worker_size == 0 {
panic!(
"Create WorkerTaskPool failed, worker size: {}, reason: invalid worker size",
worker_size
);
}
if interval == 0 {
panic!(
"Create WorkerTaskPool failed, interval: {}, reason: invalid interval",
worker_size
);
}
let public = Injector::new();
let mut workers = Vec::with_capacity(worker_size);
let mut internal_stealers = Vec::with_capacity(worker_size);
let mut external_stealers = Vec::with_capacity(worker_size);
for _ in 0..worker_size {
let thread_waker = Arc::new((AtomicBool::new(false), Mutex::new(()), Condvar::new()));
let (worker,
internal_stealer,
external_stealer) =
StealableTaskQueue::new(internal_queue_capacity,
thread_waker);
workers.push(worker);
internal_stealers.push(internal_stealer);
external_stealers.push(external_stealer);
}
let internal_consume = AtomicUsize::new(0);
let internal_produce = AtomicUsize::new(0);
let internal_traffic_statistics = AtomicUsize::new(0);
let external_consume = AtomicUsize::new(0);
let external_produce = AtomicUsize::new(0);
let external_traffic_statistics = AtomicUsize::new(0);
let clock = Clock::new();
let last_time = UnsafeCell::new(clock.recent());
StealableTaskPool {
public,
workers,
internal_stealers,
external_stealers,
internal_consume,
internal_produce,
internal_traffic_statistics,
external_consume,
external_produce,
external_traffic_statistics,
weights,
clock,
interval,
last_time,
}
}
}
pub struct MultiTaskRuntime<
O: Default + 'static = (),
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = StealableTaskPool<O>,
>(
Arc<(
usize, //运行时唯一id
Arc<P>, //异步任务池
Option<
Vec<(
Sender<(usize, AsyncTimingTask<P, O>)>,
Arc<AsyncTaskTimerByNotCancel<P, O>>,
)>,
>, //休眠的异步任务生产者和本地定时器
AtomicUsize, //定时任务计数器
Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>, //待唤醒的工作者唤醒器队列
AtomicUsize, //定时器生产计数
AtomicUsize, //定时器消费计数
)>,
);
unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Send
for MultiTaskRuntime<O, P>
{
}
unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Sync
for MultiTaskRuntime<O, P>
{
}
impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Clone
for MultiTaskRuntime<O, P>
{
fn clone(&self) -> Self {
MultiTaskRuntime(self.0.clone())
}
}
impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>> AsyncRuntime<O>
for MultiTaskRuntime<O, P>
{
type Pool = P;
fn shared_pool(&self) -> Arc<Self::Pool> {
(self.0).1.clone()
}
fn get_id(&self) -> usize {
(self.0).0
}
fn wait_len(&self) -> usize {
(self.0)
.5
.load(Ordering::Relaxed)
.checked_sub((self.0).6.load(Ordering::Relaxed))
.unwrap_or(0)
}
fn len(&self) -> usize {
(self.0).1.len()
}
fn alloc<R: 'static>(&self) -> TaskId {
TaskId(UnsafeCell::new((TaskHandle::<R>::default().into_raw() as u128) << 64 | self.get_id() as u128 & 0xffffffffffffffff))
}
fn spawn<F>(&self, future: F) -> Result<TaskId>
where
F: Future<Output = O> + Send + 'static,
{
let task_id = self.alloc::<F::Output>();
if let Err(e) = self.spawn_by_id(task_id.clone(), future) {
return Err(e);
}
Ok(task_id)
}
fn spawn_local<F>(&self, future: F) -> Result<TaskId>
where
F: Future<Output=O> + Send + 'static {
let task_id = self.alloc::<F::Output>();
if let Err(e) = self.spawn_local_by_id(task_id.clone(), future) {
return Err(e);
}
Ok(task_id)
}
fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
where
F: Future<Output=O> + Send + 'static {
let task_id = self.alloc::<F::Output>();
if let Err(e) = self.spawn_priority_by_id(task_id.clone(), priority, future) {
return Err(e);
}
Ok(task_id)
}
fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
where
F: Future<Output=O> + Send + 'static {
let task_id = self.alloc::<F::Output>();
if let Err(e) = self.spawn_yield_by_id(task_id.clone(), future) {
return Err(e);
}
Ok(task_id)
}
fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
where
F: Future<Output = O> + Send + 'static,
{
let task_id = self.alloc::<F::Output>();
if let Err(e) = self.spawn_timing_by_id(task_id.clone(), future, time) {
return Err(e);
}
Ok(task_id)
}
fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where
F: Future<Output=O> + Send + 'static {
let result = {
(self.0).1.push(Arc::new(AsyncTask::new(
task_id,
(self.0).1.clone(),
DEFAULT_MAX_LOW_PRIORITY_BOUNDED,
Some(future.boxed()),
)))
};
if let Some(worker_waker) = (self.0).4.pop() {
let (is_sleep, lock, condvar) = &*worker_waker;
let _locked = lock.lock();
if is_sleep.load(Ordering::Relaxed) {
is_sleep.store(false, Ordering::SeqCst); condvar.notify_one();
}
}
result
}
fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where
F: Future<Output=O> + Send + 'static {
(self.0).1.push_local(Arc::new(AsyncTask::new(
task_id,
(self.0).1.clone(),
DEFAULT_HIGH_PRIORITY_BOUNDED,
Some(future.boxed()),
)))
}
fn spawn_priority_by_id<F>(&self,
task_id: TaskId,
priority: usize,
future: F) -> Result<()>
where
F: Future<Output=O> + Send + 'static {
let result = {
(self.0).1.push_priority(priority, Arc::new(AsyncTask::new(
task_id,
(self.0).1.clone(),
priority,
Some(future.boxed()),
)))
};
if let Some(worker_waker) = (self.0).4.pop() {
let (is_sleep, lock, condvar) = &*worker_waker;
let _locked = lock.lock();
if is_sleep.load(Ordering::Relaxed) {
is_sleep.store(false, Ordering::SeqCst); condvar.notify_one();
}
}
result
}
#[inline]
fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where
F: Future<Output=O> + Send + 'static {
self.spawn_priority_by_id(task_id,
DEFAULT_HIGH_PRIORITY_BOUNDED,
future)
}
fn spawn_timing_by_id<F>(&self,
task_id: TaskId,
future: F,
time: usize) -> Result<()>
where
F: Future<Output=O> + Send + 'static {
let rt = self.clone();
self.spawn_by_id(task_id, async move {
if let Some(timers) = &(rt.0).2 {
let id = (rt.0).1.get_thread_id() & 0xffffffff;
let (_, timer) = &timers[id];
timer.set_timer(
AsyncTimingTask::WaitRun(Arc::new(AsyncTask::new(
rt.alloc::<F::Output>(),
(rt.0).1.clone(),
DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
Some(future.boxed()),
))),
time,
);
(rt.0).5.fetch_add(1, Ordering::Relaxed);
}
Default::default()
})
}
fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
task_id.set_waker::<Output>(waker);
Poll::Pending
}
fn wakeup<Output: 'static>(&self, task_id: &TaskId) {
task_id.wakeup::<Output>();
}
fn wait<V: Send + 'static>(&self) -> AsyncWait<V> {
AsyncWait(self.wait_any(2))
}
fn wait_any<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncWaitAny {
capacity,
producor,
consumer,
}
}
fn wait_any_callback<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncWaitAnyCallback {
capacity,
producor,
consumer,
}
}
fn map_reduce<V: Send + 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncMapReduce {
count: 0,
capacity,
producor,
consumer,
}
}
fn timeout(&self, timeout: usize) -> BoxFuture<'static, ()> {
let rt = self.clone();
if let Some(timers) = &(self.0).2 {
match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
let thread_id = unsafe { *thread_id.get() };
let index = thread_id & 0xffffffff;
if index > timers.len() {
TimerTaskProducor::Foreign(timers[(self.0).3.load(Ordering::Relaxed) % timers.len()].0.clone())
} else {
TimerTaskProducor::Local(timers[index].1.clone())
}
}) {
Err(_) => {
panic!("Multi thread runtime timeout failed, reason: local thread id not match")
}
Ok(producor) => match producor {
TimerTaskProducor::Local(timer) => {
LocalAsyncWaitTimeout::new(rt, timer, timeout).boxed()
},
TimerTaskProducor::Foreign(producor) => {
AsyncWaitTimeout::new(rt, producor, timeout).boxed()
},
},
}
} else {
async move {
thread::sleep(Duration::from_millis(timeout as u64));
}
.boxed()
}
}
fn yield_now(&self) -> BoxFuture<'static, ()> {
async move {
YieldNow(false).await;
}.boxed()
}
fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> BoxStream<'static, FO>
where
S: Stream<Item = SO> + Send + 'static,
SO: Send + 'static,
F: FnMut(SO) -> AsyncPipelineResult<FO> + Send + 'static,
FO: Send + 'static,
{
let output = stream! {
for await value in input {
match filter(value) {
AsyncPipelineResult::Disconnect => {
break;
},
AsyncPipelineResult::Filtered(result) => {
yield result;
},
}
}
};
output.boxed()
}
fn close(&self) -> bool {
false
}
}
impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>> AsyncRuntimeExt<O>
for MultiTaskRuntime<O, P>
{
fn spawn_with_context<F, C>(&self, task_id: TaskId, future: F, context: C) -> Result<()>
where
F: Future<Output = O> + Send + 'static,
C: 'static,
{
let task = Arc::new(AsyncTask::with_context(
task_id,
(self.0).1.clone(),
DEFAULT_MAX_LOW_PRIORITY_BOUNDED,
Some(future.boxed()),
context,
));
let result = (self.0).1.push(task);
if let Some(worker_waker) = (self.0).4.pop() {
let (is_sleep, lock, condvar) = &*worker_waker;
let _locked = lock.lock();
if is_sleep.load(Ordering::Relaxed) {
is_sleep.store(false, Ordering::SeqCst); condvar.notify_one();
}
}
result
}
fn spawn_timing_with_context<F, C>(
&self,
task_id: TaskId,
future: F,
context: C,
time: usize,
) -> Result<()>
where
F: Future<Output = O> + Send + 'static,
C: Send + 'static,
{
let rt = self.clone();
self.spawn_by_id(task_id, async move {
if let Some(timers) = &(rt.0).2 {
let id = (rt.0).1.get_thread_id() & 0xffffffff;
let (_, timer) = &timers[id];
timer.set_timer(
AsyncTimingTask::WaitRun(Arc::new(AsyncTask::with_context(
rt.alloc::<F::Output>(),
(rt.0).1.clone(),
DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
Some(future.boxed()),
context,
))),
time,
);
(rt.0).5.fetch_add(1, Ordering::Relaxed);
}
Default::default()
})
}
fn block_on<F>(&self, future: F) -> Result<F::Output>
where
F: Future + Send + 'static,
<F as Future>::Output: Default + Send + 'static,
{
if let Some(local_rt) = local_async_runtime::<F::Output>() {
if local_rt.get_id() == self.get_id() {
return Err(Error::new(
ErrorKind::WouldBlock,
format!("Block on failed, reason: would block"),
));
}
}
let (sender, receiver) = bounded(1);
if let Err(e) = self.spawn(async move {
let r = future.await;
sender.send(r);
Default::default()
}) {
return Err(Error::new(
ErrorKind::Other,
format!("Block on failed, reason: {:?}", e),
));
}
match receiver.recv() {
Err(e) => Err(Error::new(
ErrorKind::Other,
format!("Block on failed, reason: {:?}", e),
)),
Ok(result) => Ok(result),
}
}
}
impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>
MultiTaskRuntime<O, P>
{
pub fn idler_len(&self) -> usize {
(self.0).1.idler_len()
}
pub fn worker_len(&self) -> usize {
(self.0).1.worker_len()
}
pub fn buffer_len(&self) -> usize {
(self.0).1.buffer_len()
}
pub fn to_local_runtime(&self) -> LocalAsyncRuntime<O> {
LocalAsyncRuntime {
inner: self.as_raw(),
get_id_func: MultiTaskRuntime::<O, P>::get_id_raw,
spawn_func: MultiTaskRuntime::<O, P>::spawn_raw,
spawn_local_func: MultiTaskRuntime::<O, P>::spawn_local_raw,
spawn_timing_func: MultiTaskRuntime::<O, P>::spawn_timing_raw,
timeout_func: MultiTaskRuntime::<O, P>::timeout_raw,
}
}
#[inline]
pub(crate) fn as_raw(&self) -> *const () {
Arc::into_raw(self.0.clone()) as *const ()
}
#[inline]
pub(crate) fn from_raw(raw: *const ()) -> Self {
let inner = unsafe {
Arc::from_raw(
raw as *const (
usize,
Arc<P>,
Option<
Vec<(
Sender<(usize, AsyncTimingTask<P, O>)>,
Arc<AsyncTaskTimerByNotCancel<P, O>>,
)>,
>,
AtomicUsize,
Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>,
AtomicUsize,
AtomicUsize,
),
)
};
MultiTaskRuntime(inner)
}
pub(crate) fn get_id_raw(raw: *const ()) -> usize {
let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
let id = rt.get_id();
Arc::into_raw(rt.0); id
}
pub(crate) fn spawn_raw<F>(raw: *const (), future: F) -> Result<()>
where
F: Future<Output = O> + Send + 'static,
{
let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
let result = rt.spawn_by_id(rt.alloc::<F::Output>(), future);
Arc::into_raw(rt.0); result
}
pub(crate) fn spawn_local_raw<F>(raw: *const (), future: F) -> Result<()>
where
F: Future<Output = O> + Send + 'static,
{
let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
let result = rt.spawn_local_by_id(rt.alloc::<F::Output>(), future);
Arc::into_raw(rt.0); result
}
pub(crate) fn spawn_timing_raw(
raw: *const (),
future: BoxFuture<'static, O>,
timeout: usize,
) -> Result<()> {
let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
let result = rt.spawn_timing_by_id(rt.alloc::<O>(), future, timeout);
Arc::into_raw(rt.0); result
}
pub(crate) fn timeout_raw(raw: *const (), timeout: usize) -> BoxFuture<'static, ()> {
let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
let boxed = rt.timeout(timeout);
Arc::into_raw(rt.0); boxed
}
}
pub struct MultiTaskRuntimeBuilder<
O: Default + 'static = (),
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = StealableTaskPool<O>,
> {
pool: P, prefix: String, init: usize, min: usize, max: usize, stack_size: usize, timeout: u64, interval: Option<usize>, marker: PhantomData<O>,
}
unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Send
for MultiTaskRuntimeBuilder<O, P>
{
}
unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Sync
for MultiTaskRuntimeBuilder<O, P>
{
}
impl<O: Default + 'static> Default for MultiTaskRuntimeBuilder<O> {
fn default() -> Self {
#[cfg(not(target_arch = "wasm32"))]
let core_len = num_cpus::get(); #[cfg(target_arch = "wasm32")]
let core_len = 1; let pool = StealableTaskPool::with(core_len,
65535,
[1, 1],
3000);
MultiTaskRuntimeBuilder::new(pool)
.thread_stack_size(2 * 1024 * 1024)
.set_timer_interval(1)
}
}
impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>
MultiTaskRuntimeBuilder<O, P>
{
pub fn new(mut pool: P) -> Self {
#[cfg(not(target_arch = "wasm32"))]
let core_len = num_cpus::get(); #[cfg(target_arch = "wasm32")]
let core_len = 1;
MultiTaskRuntimeBuilder {
pool,
prefix: DEFAULT_WORKER_THREAD_PREFIX.to_string(),
init: core_len,
min: core_len,
max: core_len,
stack_size: DEFAULT_THREAD_STACK_SIZE,
timeout: DEFAULT_WORKER_THREAD_SLEEP_TIME,
interval: None,
marker: PhantomData,
}
}
pub fn thread_prefix(mut self, prefix: &str) -> Self {
self.prefix = prefix.to_string();
self
}
pub fn thread_stack_size(mut self, stack_size: usize) -> Self {
self.stack_size = stack_size;
self
}
pub fn init_worker_size(mut self, mut init: usize) -> Self {
if init == 0 {
init = DEFAULT_INIT_WORKER_SIZE;
}
self.init = init;
self
}
pub fn set_worker_limit(mut self, mut min: usize, mut max: usize) -> Self {
if self.init > max {
max = self.init;
}
if min == 0 || min > max {
min = max;
}
self.min = min;
self.max = max;
self
}
pub fn set_timeout(mut self, timeout: u64) -> Self {
self.timeout = timeout;
self
}
pub fn set_timer_interval(mut self, interval: usize) -> Self {
self.interval = Some(interval);
self
}
pub fn build(mut self) -> MultiTaskRuntime<O, P> {
let interval = self.interval;
let mut timers = if let Some(_) = interval {
Some(Vec::with_capacity(self.max))
} else {
None
};
for _ in 0..self.max {
if let Some(vec) = &mut timers {
let timer = AsyncTaskTimerByNotCancel::new();
let producor = timer.producor.clone();
let timer = Arc::new(timer);
vec.push((producor, timer));
};
}
let rt_uid = alloc_rt_uid();
let waits = Arc::new(ArrayQueue::new(self.max));
let mut pool = self.pool;
pool.set_waits(waits.clone()); let pool = Arc::new(pool);
let runtime = MultiTaskRuntime(Arc::new((
rt_uid,
pool,
timers,
AtomicUsize::new(0),
waits,
AtomicUsize::new(0),
AtomicUsize::new(0),
)));
let mut builders = Vec::with_capacity(self.init);
for index in 0..self.init {
let builder = Builder::new()
.name(self.prefix.clone() + "-" + index.to_string().as_str())
.stack_size(self.stack_size);
builders.push(builder);
}
let min = self.min;
for index in 0..builders.len() {
let builder = builders.remove(0);
let runtime = runtime.clone();
let timeout = self.timeout;
let timer = if let Some(timers) = &(runtime.0).2 {
let (_, timer) = &timers[index];
Some(timer.clone())
} else {
None
};
spawn_worker_thread(builder, index, runtime, min, timeout, interval, timer);
}
runtime
}
}
fn spawn_worker_thread<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
>(
builder: Builder,
index: usize,
runtime: MultiTaskRuntime<O, P>,
min: usize,
timeout: u64,
interval: Option<usize>,
timer: Option<Arc<AsyncTaskTimerByNotCancel<P, O>>>,
) {
if let Some(timer) = timer {
let rt_uid = runtime.get_id();
let _ = builder.spawn(move || {
if let Err(e) = PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| unsafe {
*thread_id.get() = rt_uid << 32 | index & 0xffffffff;
}) {
panic!(
"Multi thread runtime startup failed, thread id: {:?}, reason: {:?}",
index, e
);
}
let runtime_copy = runtime.clone();
match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
let raw = Arc::into_raw(Arc::new(runtime_copy.to_local_runtime()))
as *mut LocalAsyncRuntime<O> as *mut ();
rt.store(raw, Ordering::Relaxed);
}) {
Err(e) => {
panic!("Bind multi runtime to local thread failed, reason: {:?}", e);
}
Ok(_) => (),
}
timer_work_loop(
runtime,
index,
min,
timeout,
interval.unwrap() as u64,
timer,
);
});
} else {
let rt_uid = runtime.get_id();
let _ = builder.spawn(move || {
if let Err(e) = PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| unsafe {
*thread_id.get() = rt_uid << 32 | index & 0xffffffff;
}) {
panic!(
"Multi thread runtime startup failed, thread id: {:?}, reason: {:?}",
index, e
);
}
let runtime_copy = runtime.clone();
match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
let raw = Arc::into_raw(Arc::new(runtime_copy.to_local_runtime()))
as *mut LocalAsyncRuntime<O> as *mut ();
rt.store(raw, Ordering::Relaxed);
}) {
Err(e) => {
panic!("Bind multi runtime to local thread failed, reason: {:?}", e);
}
Ok(_) => (),
}
work_loop(runtime, index, min, timeout);
});
}
}
fn timer_work_loop<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(
runtime: MultiTaskRuntime<O, P>,
index: usize,
min: usize,
sleep_timeout: u64,
timer_interval: u64,
timer: Arc<AsyncTaskTimerByNotCancel<P, O>>,
) {
let pool = (runtime.0).1.clone();
let worker_waker = pool.clone_thread_waker().unwrap();
let mut sleep_count = 0; let clock = Clock::new();
loop {
let timer_run_millis = clock.recent(); let mut pop_len = 0;
(runtime.0)
.5
.fetch_add(timer.consume(),
Ordering::Relaxed);
loop {
let current_time = timer.is_require_pop();
if let Some(current_time) = current_time {
loop {
let timed_out = timer.pop(current_time);
if let Some(timing_task) = timed_out {
match timing_task {
AsyncTimingTask::Pended(expired) => {
runtime.wakeup::<O>(&expired);
}
AsyncTimingTask::WaitRun(expired) => {
(runtime.0)
.1
.push_priority(DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
expired);
if let Some(task) = pool.try_pop() {
sleep_count = 0; run_task(&runtime, task);
}
}
AsyncTimingTask::TimeoutWake(waiter) => {
waiter.fire();
}
}
pop_len += 1;
if let Some(task) = pool.try_pop() {
sleep_count = 0; run_task(&runtime, task);
}
} else {
break;
}
}
} else {
break;
}
}
(runtime.0)
.6
.fetch_add(pop_len,
Ordering::Relaxed);
match pool.try_pop() {
None => {
if runtime.len() > 0 {
continue;
}
{
let (is_sleep, lock, condvar) = &*worker_waker;
let mut locked = lock.lock();
is_sleep.store(true, Ordering::SeqCst);
let diff_time = clock
.recent()
.duration_since(timer_run_millis)
.as_millis() as u64; let real_timeout = if timer.len() == 0 {
sleep_timeout
} else {
if diff_time >= timer_interval {
continue;
} else {
timer_interval - diff_time
}
};
(runtime.0).4.push(worker_waker.clone());
if condvar
.wait_for(&mut locked, Duration::from_millis(real_timeout))
.timed_out()
{
is_sleep.store(false, Ordering::SeqCst);
sleep_count += 1;
}
}
}
Some(task) => {
sleep_count = 0; run_task(&runtime, task);
}
}
}
(runtime.0).1.close_worker();
warn!(
"Worker of runtime closed, runtime: {}, worker: {}, thread: {:?}",
runtime.get_id(),
index,
thread::current()
);
}
fn work_loop<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(
runtime: MultiTaskRuntime<O, P>,
index: usize,
min: usize,
sleep_timeout: u64,
) {
let pool = (runtime.0).1.clone();
let worker_waker = pool.clone_thread_waker().unwrap();
let mut sleep_count = 0; loop {
match pool.try_pop() {
None => {
if runtime.len() > 0 {
continue;
}
{
let (is_sleep, lock, condvar) = &*worker_waker;
let mut locked = lock.lock();
is_sleep.store(true, Ordering::SeqCst);
(runtime.0).4.push(worker_waker.clone());
if condvar
.wait_for(&mut locked, Duration::from_millis(sleep_timeout))
.timed_out()
{
is_sleep.store(false, Ordering::SeqCst);
sleep_count += 1;
}
}
}
Some(task) => {
sleep_count = 0; run_task(&runtime, task);
}
}
}
(runtime.0).1.close_worker();
warn!(
"Worker of runtime closed, runtime: {}, worker: {}, thread: {:?}",
runtime.get_id(),
index,
thread::current()
);
}
#[inline]
fn run_task<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(
runtime: &MultiTaskRuntime<O, P>,
task: Arc<AsyncTask<P, O>>,
) {
let waker = waker_ref(&task);
let mut context = Context::from_waker(&*waker);
if let Some(mut future) = task.get_inner() {
if let Poll::Pending = future.as_mut().poll(&mut context) {
task.set_inner(Some(future));
}
} else {
(runtime.0).1.push(task);
}
}
enum TimerTaskProducor<
O: Default + 'static = (),
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = StealableTaskPool<O>,
> {
Local(Arc<AsyncTaskTimerByNotCancel<P, O>>), Foreign(Sender<(usize, AsyncTimingTask<P, O>)>), }