use std::thread;
use std::sync::Arc;
use std::vec::IntoIter;
use std::future::Future;
use std::cell::UnsafeCell;
use std::task::{Context, Poll, Waker};
use std::io::{Error, ErrorKind, Result};
use std::collections::vec_deque::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use async_stream::stream;
use crossbeam_channel::Sender;
use crossbeam_queue::SegQueue;
use flume::bounded as async_bounded;
use futures::{
future::{BoxFuture, FutureExt},
stream::{BoxStream, Stream, StreamExt},
task::waker_ref,
};
use parking_lot::{Condvar, Mutex};
use quanta::Clock;
use wrr::IWRRSelector;
use super::{
PI_ASYNC_THREAD_LOCAL_ID, DEFAULT_MAX_HIGH_PRIORITY_BOUNDED, DEFAULT_HIGH_PRIORITY_BOUNDED, DEFAULT_MAX_LOW_PRIORITY_BOUNDED, alloc_rt_uid, AsyncMapReduce, AsyncPipelineResult, AsyncRuntime,
AsyncRuntimeExt, AsyncTask, AsyncTaskPool, AsyncTaskPoolExt, AsyncTaskTimer, AsyncWait,
AsyncWaitAny, AsyncWaitAnyCallback, AsyncWaitTimeout, LocalAsyncRuntime, TaskId, YieldNow
};
use crate::rt::{TaskHandle, AsyncTimingTask};
pub struct SingleTaskPool<O: Default + 'static> {
id: usize, public: SegQueue<Arc<AsyncTask<SingleTaskPool<O>, O>>>, internal: UnsafeCell<VecDeque<Arc<AsyncTask<SingleTaskPool<O>, O>>>>, stack: UnsafeCell<Vec<Arc<AsyncTask<SingleTaskPool<O>, O>>>>, selector: UnsafeCell<IWRRSelector<2>>, consume_count: AtomicUsize, produce_count: AtomicUsize, thread_waker: Option<Arc<(AtomicBool, Mutex<()>, Condvar)>>, }
unsafe impl<O: Default + 'static> Send for SingleTaskPool<O> {}
unsafe impl<O: Default + 'static> Sync for SingleTaskPool<O> {}
impl<O: Default + 'static> Default for SingleTaskPool<O> {
fn default() -> Self {
SingleTaskPool::new([1, 1])
}
}
impl<O: Default + 'static> AsyncTaskPool<O> for SingleTaskPool<O> {
type Pool = SingleTaskPool<O>;
#[inline]
fn get_thread_id(&self) -> usize {
let rt_uid = self.id;
match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
let current = unsafe { *thread_id.get() };
if current == usize::MAX {
unsafe {
*thread_id.get() = rt_uid << 32;
*thread_id.get()
}
} else {
current
}
}) {
Err(e) => {
panic!(
"Get thread id failed, thread: {:?}, reason: {:?}",
thread::current(),
e
);
}
Ok(id) => id,
}
}
#[inline]
fn len(&self) -> usize {
if let Some(len) = self
.produce_count
.load(Ordering::Relaxed)
.checked_sub(self.consume_count.load(Ordering::Relaxed))
{
len
} else {
0
}
}
#[inline]
fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
self.public.push(task);
self.produce_count.fetch_add(1, Ordering::Relaxed);
Ok(())
}
#[inline]
fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
let id = self.get_thread_id();
let rt_uid = task.owner();
if (id >> 32) == rt_uid {
unsafe {{
(&mut *self.internal.get()).push_back(task);
}}
self.produce_count.fetch_add(1, Ordering::Relaxed);
Ok(())
} else {
self.push(task)
}
}
#[inline]
fn push_priority(&self,
priority: usize,
task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
if priority >= DEFAULT_MAX_HIGH_PRIORITY_BOUNDED {
let id = self.get_thread_id();
let rt_uid = task.owner();
if (id >> 32) == rt_uid {
unsafe {
let stack = (&mut *self.stack.get());
if stack
.capacity()
.checked_sub(stack.len())
.unwrap_or(0) >= 0 {
(&mut *self.stack.get()).push(task);
} else {
(&mut *self.internal.get()).push_back(task);
}
}
self.produce_count.fetch_add(1, Ordering::Relaxed);
Ok(())
} else {
self.push(task)
}
} else if priority >= DEFAULT_HIGH_PRIORITY_BOUNDED {
self.push_local(task)
} else {
self.push(task)
}
}
#[inline]
fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
self.push_priority(DEFAULT_HIGH_PRIORITY_BOUNDED, task)
}
#[inline]
fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
let task = unsafe { (&mut *self
.stack
.get())
.pop()
};
if task.is_some() {
self.consume_count.fetch_add(1, Ordering::Relaxed);
return task;
}
let task = try_pop_by_weight(self);
if task.is_some() {
self
.consume_count
.fetch_add(1, Ordering::Relaxed);
}
task
}
#[inline]
fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
let mut all = Vec::with_capacity(self.len());
let internal = unsafe { (&mut *self.internal.get()) };
for _ in 0..internal.len() {
if let Some(task) = internal.pop_front() {
all.push(task);
}
}
let public_len = self.public.len();
for _ in 0..public_len {
if let Some(task) = self.public.pop() {
all.push(task);
}
}
all.into_iter()
}
#[inline]
fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
self.thread_waker.as_ref()
}
}
fn try_pop_by_weight<O: Default + 'static>(pool: &SingleTaskPool<O>)
-> Option<Arc<AsyncTask<SingleTaskPool<O>, O>>> {
unsafe {
match (&mut *pool.selector.get()).select() {
0 => {
let task = try_pop_external(pool);
if task.is_some() {
task
} else {
try_pop_internal(pool)
}
},
_ => {
let task = try_pop_internal(pool);
if task.is_some() {
task
} else {
try_pop_external(pool)
}
},
}
}
}
#[inline]
fn try_pop_internal<O: Default + 'static>(pool: &SingleTaskPool<O>)
-> Option<Arc<AsyncTask<SingleTaskPool<O>, O>>> {
unsafe { (&mut *pool.internal.get()).pop_front() }
}
#[inline]
fn try_pop_external<O: Default + 'static>(pool: &SingleTaskPool<O>)
-> Option<Arc<AsyncTask<SingleTaskPool<O>, O>>> {
pool.public.pop()
}
impl<O: Default + 'static> AsyncTaskPoolExt<O> for SingleTaskPool<O> {
fn set_thread_waker(&mut self, thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) {
self.thread_waker = Some(thread_waker);
}
}
impl<O: Default + 'static> SingleTaskPool<O> {
pub fn new(weights: [u8; 2]) -> Self {
let id = alloc_rt_uid();
let public = SegQueue::new();
let internal = UnsafeCell::new(VecDeque::new());
let stack = UnsafeCell::new(Vec::with_capacity(1));
let selector = UnsafeCell::new(IWRRSelector::new(weights));
let consume_count = AtomicUsize::new(0);
let produce_count = AtomicUsize::new(0);
SingleTaskPool {
id,
public,
internal,
stack,
selector,
consume_count,
produce_count,
thread_waker: Some(Arc::new((
AtomicBool::new(false),
Mutex::new(()),
Condvar::new(),
))),
}
}
}
pub struct SingleTaskRuntime<
O: Default + 'static = (),
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>,
>(
Arc<(
usize, Arc<P>, Sender<(usize, AsyncTimingTask<P, O>)>, AsyncTaskTimer<P, O>, AtomicUsize, AtomicUsize, )>,
);
unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Send
for SingleTaskRuntime<O, P>
{
}
unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Sync
for SingleTaskRuntime<O, P>
{
}
impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Clone
for SingleTaskRuntime<O, P>
{
fn clone(&self) -> Self {
SingleTaskRuntime(self.0.clone())
}
}
impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>> AsyncRuntime<O>
for SingleTaskRuntime<O, P>
{
type Pool = P;
fn shared_pool(&self) -> Arc<Self::Pool> {
(self.0).1.clone()
}
fn get_id(&self) -> usize {
(self.0).0
}
fn wait_len(&self) -> usize {
(self.0)
.4
.load(Ordering::Relaxed)
.checked_sub((self.0).5.load(Ordering::Relaxed))
.unwrap_or(0)
}
fn len(&self) -> usize {
(self.0).1.len()
}
fn alloc<R: 'static>(&self) -> TaskId {
TaskId(UnsafeCell::new((TaskHandle::<R>::default().into_raw() as u128) << 64 | self.get_id() as u128 & 0xffffffffffffffff))
}
fn spawn<F>(&self, future: F) -> Result<TaskId>
where
F: Future<Output = O> + Send + 'static,
{
let task_id = self.alloc::<F::Output>();
if let Err(e) = self.spawn_by_id(task_id.clone(), future) {
return Err(e);
}
Ok(task_id)
}
fn spawn_local<F>(&self, future: F) -> Result<TaskId>
where
F: Future<Output = O> + Send + 'static {
let task_id = self.alloc::<F::Output>();
if let Err(e) = self.spawn_local_by_id(task_id.clone(), future) {
return Err(e);
}
Ok(task_id)
}
fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
where
F: Future<Output = O> + Send + 'static {
let task_id = self.alloc::<F::Output>();
if let Err(e) = self.spawn_priority_by_id(task_id.clone(), priority, future) {
return Err(e);
}
Ok(task_id)
}
fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
where
F: Future<Output = O> + Send + 'static {
let task_id = self.alloc::<F::Output>();
if let Err(e) = self.spawn_yield_by_id(task_id.clone(), future) {
return Err(e);
}
Ok(task_id)
}
fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
where
F: Future<Output = O> + Send + 'static,
{
let task_id = self.alloc::<F::Output>();
if let Err(e) = self.spawn_timing_by_id(task_id.clone(), future, time) {
return Err(e);
}
Ok(task_id)
}
fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where
F: Future<Output = O> + Send + 'static {
if let Err(e) = (self.0).1.push(Arc::new(AsyncTask::new(
task_id,
(self.0).1.clone(),
DEFAULT_MAX_LOW_PRIORITY_BOUNDED,
Some(future.boxed()),
))) {
return Err(Error::new(ErrorKind::Other, e));
}
Ok(())
}
fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where
F: Future<Output = O> + Send + 'static {
(self.0).1.push_local(Arc::new(AsyncTask::new(
task_id,
(self.0).1.clone(),
DEFAULT_HIGH_PRIORITY_BOUNDED,
Some(future.boxed()))))
}
fn spawn_priority_by_id<F>(&self,
task_id: TaskId,
priority: usize,
future: F) -> Result<()>
where
F: Future<Output = O> + Send + 'static {
(self.0).1.push_priority(priority, Arc::new(AsyncTask::new(
task_id,
(self.0).1.clone(),
priority,
Some(future.boxed()))))
}
#[inline]
fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where
F: Future<Output = O> + Send + 'static {
self.spawn_priority_by_id(task_id,
DEFAULT_HIGH_PRIORITY_BOUNDED,
future)
}
fn spawn_timing_by_id<F>(&self,
task_id: TaskId,
future: F,
time: usize) -> Result<()>
where
F: Future<Output = O> + Send + 'static {
let rt = self.clone();
self.spawn_by_id(task_id, async move {
(rt.0).3.set_timer(
AsyncTimingTask::WaitRun(Arc::new(AsyncTask::new(
rt.alloc::<F::Output>(),
(rt.0).1.clone(),
DEFAULT_HIGH_PRIORITY_BOUNDED,
Some(future.boxed()),
))),
time,
);
(rt.0).4.fetch_add(1, Ordering::Relaxed);
Default::default()
})
}
fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
task_id.set_waker::<Output>(waker);
Poll::Pending
}
fn wakeup<Output: 'static>(&self, task_id: &TaskId) {
task_id.wakeup::<Output>();
}
fn wait<V: Send + 'static>(&self) -> AsyncWait<V> {
AsyncWait(self.wait_any(2))
}
fn wait_any<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncWaitAny {
capacity,
producor,
consumer,
}
}
fn wait_any_callback<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncWaitAnyCallback {
capacity,
producor,
consumer,
}
}
fn map_reduce<V: Send + 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncMapReduce {
count: 0,
capacity,
producor,
consumer,
}
}
fn timeout(&self, timeout: usize) -> BoxFuture<'static, ()> {
let rt = self.clone();
let producor = (self.0).2.clone();
AsyncWaitTimeout::new(rt, producor, timeout).boxed()
}
fn yield_now(&self) -> BoxFuture<'static, ()> {
async move {
YieldNow(false).await;
}.boxed()
}
fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> BoxStream<'static, FO>
where
S: Stream<Item = SO> + Send + 'static,
SO: Send + 'static,
F: FnMut(SO) -> AsyncPipelineResult<FO> + Send + 'static,
FO: Send + 'static,
{
let output = stream! {
for await value in input {
match filter(value) {
AsyncPipelineResult::Disconnect => {
break;
},
AsyncPipelineResult::Filtered(result) => {
yield result;
},
}
}
};
output.boxed()
}
fn close(&self) -> bool {
false
}
}
impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>> AsyncRuntimeExt<O>
for SingleTaskRuntime<O, P>
{
fn spawn_with_context<F, C>(&self, task_id: TaskId, future: F, context: C) -> Result<()>
where
F: Future<Output = O> + Send + 'static,
C: 'static,
{
if let Err(e) = (self.0).1.push(Arc::new(AsyncTask::with_context(
task_id,
(self.0).1.clone(),
DEFAULT_MAX_LOW_PRIORITY_BOUNDED,
Some(future.boxed()),
context,
))) {
return Err(Error::new(ErrorKind::Other, e));
}
Ok(())
}
fn spawn_timing_with_context<F, C>(
&self,
task_id: TaskId,
future: F,
context: C,
time: usize,
) -> Result<()>
where
F: Future<Output = O> + Send + 'static,
C: Send + 'static,
{
let rt = self.clone();
self.spawn_by_id(task_id, async move {
(rt.0).3.set_timer(
AsyncTimingTask::WaitRun(Arc::new(AsyncTask::with_context(
rt.alloc::<F::Output>(),
(rt.0).1.clone(),
DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
Some(future.boxed()),
context,
))),
time,
);
(rt.0).4.fetch_add(1, Ordering::Relaxed);
Default::default()
})
}
fn block_on<F>(&self, future: F) -> Result<F::Output>
where
F: Future + Send + 'static,
<F as Future>::Output: Default + Send + 'static,
{
let runner = SingleTaskRunner {
is_running: AtomicBool::new(true),
runtime: self.clone(),
clock: Clock::new(),
};
let mut result: Option<<F as Future>::Output> = None;
let result_raw = (&mut result) as *mut Option<<F as Future>::Output> as usize;
self.spawn(async move {
let r = future.await;
unsafe {
*(result_raw as *mut Option<<F as Future>::Output>) = Some(r);
}
Default::default()
});
loop {
while runner.run()? > 0 {}
if let Some(result) = result.take() {
return Ok(result);
}
}
}
}
impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>
SingleTaskRuntime<O, P>
{
pub fn to_local_runtime(&self) -> LocalAsyncRuntime<O> {
LocalAsyncRuntime {
inner: self.as_raw(),
get_id_func: SingleTaskRuntime::<O, P>::get_id_raw,
spawn_func: SingleTaskRuntime::<O, P>::spawn_raw,
spawn_timing_func: SingleTaskRuntime::<O, P>::spawn_timing_raw,
timeout_func: SingleTaskRuntime::<O, P>::timeout_raw,
}
}
#[inline]
pub(crate) fn as_raw(&self) -> *const () {
Arc::into_raw(self.0.clone()) as *const ()
}
#[inline]
pub(crate) fn from_raw(raw: *const ()) -> Self {
let inner = unsafe {
Arc::from_raw(
raw as *const (
usize,
Arc<P>,
Sender<(usize, AsyncTimingTask<P, O>)>,
AsyncTaskTimer<P, O>,
AtomicUsize,
AtomicUsize,
),
)
};
SingleTaskRuntime(inner)
}
pub(crate) fn get_id_raw(raw: *const ()) -> usize {
let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
let id = rt.get_id();
Arc::into_raw(rt.0); id
}
pub(crate) fn spawn_raw(raw: *const (), future: BoxFuture<'static, O>) -> Result<()> {
let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
let result = rt.spawn_by_id(rt.alloc::<O>(), future);
Arc::into_raw(rt.0); result
}
pub(crate) fn spawn_timing_raw(
raw: *const (),
future: BoxFuture<'static, O>,
timeout: usize,
) -> Result<()> {
let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
let result = rt.spawn_timing_by_id(rt.alloc::<O>(), future, timeout);
Arc::into_raw(rt.0); result
}
pub(crate) fn timeout_raw(raw: *const (), timeout: usize) -> BoxFuture<'static, ()> {
let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
let boxed = rt.timeout(timeout);
Arc::into_raw(rt.0); boxed
}
}
pub struct SingleTaskRunner<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>,
> {
is_running: AtomicBool, runtime: SingleTaskRuntime<O, P>, clock: Clock, }
unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Send
for SingleTaskRunner<O, P>
{
}
unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Sync
for SingleTaskRunner<O, P>
{
}
impl<O: Default + 'static> Default for SingleTaskRunner<O> {
fn default() -> Self {
SingleTaskRunner::new(SingleTaskPool::default())
}
}
impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>
SingleTaskRunner<O, P>
{
pub fn new(pool: P) -> Self {
let rt_uid = pool.get_thread_id() >> 32;
let pool = Arc::new(pool);
let timer = AsyncTaskTimer::new();
let producor = timer.producor.clone();
let timer_producor_count = AtomicUsize::new(0);
let timer_consume_count = AtomicUsize::new(0);
let runtime = SingleTaskRuntime(Arc::new((rt_uid,
pool,
producor,
timer,
timer_producor_count,
timer_consume_count)));
SingleTaskRunner {
is_running: AtomicBool::new(false),
runtime,
clock: Clock::new(),
}
}
pub fn get_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
(self.runtime.0).1.get_thread_waker().cloned()
}
pub fn startup(&self) -> Option<SingleTaskRuntime<O, P>> {
if cfg!(target_arch = "aarch64") {
match self
.is_running
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
{
Ok(false) => {
Some(self.runtime.clone())
}
_ => {
None
}
}
} else {
match self.is_running.compare_exchange_weak(
false,
true,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(false) => {
Some(self.runtime.clone())
}
_ => {
None
}
}
}
}
pub fn run_once(&self) -> Result<usize> {
if !self.is_running.load(Ordering::Relaxed) {
return Err(Error::new(
ErrorKind::Other,
"Single thread runtime not running",
));
}
let mut pop_len = 0;
(self.runtime.0)
.4
.fetch_add((self.runtime.0).3.consume(),
Ordering::Relaxed);
loop {
let current_time = (self.runtime.0).3.is_require_pop();
if let Some(current_time) = current_time {
let timed_out = (self.runtime.0).3.pop(current_time);
if let Some((handle, timing_task)) = timed_out {
match timing_task {
AsyncTimingTask::Pended(expired) => {
self.runtime.wakeup::<O>(&expired);
if let Some(task) = (self.runtime.0).1.try_pop() {
run_task(task);
}
}
AsyncTimingTask::WaitRun(expired) => {
(self.runtime.0).1.push_priority(handle, expired);
if let Some(task) = (self.runtime.0).1.try_pop() {
run_task(task);
}
}
}
pop_len += 1;
}
} else {
break;
}
}
(self.runtime.0)
.5
.fetch_add(pop_len,
Ordering::Relaxed);
match (self.runtime.0).1.try_pop() {
None => {
return Ok(0);
}
Some(task) => {
run_task(task);
}
}
Ok((self.runtime.0).1.len())
}
pub fn run(&self) -> Result<usize> {
if !self.is_running.load(Ordering::Relaxed) {
return Err(Error::new(
ErrorKind::Other,
"Single thread runtime not running",
));
}
loop {
let mut pop_len = 0;
let mut start_run_millis = self.clock.recent(); (self.runtime.0)
.4
.fetch_add((self.runtime.0).3.consume(),
Ordering::Relaxed);
loop {
let current_time = (self.runtime.0).3.is_require_pop();
if let Some(current_time) = current_time {
let timed_out = (self.runtime.0).3.pop(current_time);
if let Some((handle, timing_task)) = timed_out {
match timing_task {
AsyncTimingTask::Pended(expired) => {
self.runtime.wakeup::<O>(&expired);
if let Some(task) = (self.runtime.0).1.try_pop() {
run_task(task);
}
}
AsyncTimingTask::WaitRun(expired) => {
(self.runtime.0).1.push_priority(handle, expired);
if let Some(task) = (self.runtime.0).1.try_pop() {
run_task(task);
}
}
}
pop_len += 1;
}
} else {
break;
}
}
(self.runtime.0)
.5
.fetch_add(pop_len,
Ordering::Relaxed);
while self
.clock
.recent()
.duration_since(start_run_millis)
.as_millis() < 1 {
match (self.runtime.0).1.try_pop() {
None => {
return Ok((self.runtime.0).1.len());
}
Some(task) => {
run_task(task);
}
}
}
}
}
pub fn into_local(self) -> SingleTaskRuntime<O, P> {
self.runtime
}
}
#[inline]
fn run_task<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(
task: Arc<AsyncTask<P, O>>,
) {
let waker = waker_ref(&task);
let mut context = Context::from_waker(&*waker);
if let Some(mut future) = task.get_inner() {
if let Poll::Pending = future.as_mut().poll(&mut context) {
task.set_inner(Some(future));
}
}
}