use std::thread;
use std::any::Any;
use std::pin::Pin;
use std::ptr::null_mut;
use std::vec::IntoIter;
use std::time::Duration;
use std::future::Future;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::cell::{RefCell, UnsafeCell};
use std::task::{Poll, Waker, Context};
use std::io::{Error, Result, ErrorKind};
use std::fmt::{Debug, Formatter, Result as FmtResult};
use std::sync::{Arc, atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering}};
use futures::{future::{FutureExt, LocalBoxFuture},
stream::{Stream, StreamExt, LocalBoxStream},
task::ArcWake};
use parking_lot::{Mutex, Condvar};
use crossbeam_queue::ArrayQueue;
use crossbeam_channel::{Sender, Receiver, unbounded};
use flume::{Sender as AsyncSender, Receiver as AsyncReceiver};
#[cfg(not(target_arch = "wasm32"))]
use polling::Poller;
use num_cpus;
use pi_cancel_timer::Timer;
use slotmap::{Key, KeyData};
use quanta::{Clock, Instant as QInstant};
use crate::{lock::spin,
rt::{PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME, TaskId, AsyncPipelineResult,
TimeoutWaiter, wake_thread_waker, wake_waiting_worker,
serial_local_thread::{LocalTaskRunner, LocalTaskRuntime},
serial_single_thread::SingleTaskRuntime,
serial_worker_thread::{WorkerTaskRunner, WorkerRuntime}}};
pub struct AsyncTask<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static = (),
> {
uid: TaskId, future: Mutex<Option<LocalBoxFuture<'static, O>>>, pool: Arc<P>, priority: usize, context: Option<UnsafeCell<Box<dyn Any>>>, }
unsafe impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Send for AsyncTask<P, O> {}
unsafe impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Sync for AsyncTask<P, O> {}
impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
O: Default + 'static,
> ArcWake for AsyncTask<P, O> {
fn wake_by_ref(arc_self: &Arc<Self>) {
let pool = arc_self.get_pool();
let _ = pool.push_keep(arc_self.clone());
if let Some(waits) = pool.get_waits() {
let _ = wake_waiting_worker(waits);
} else {
if let Some(thread_waker) = pool.get_thread_waker() {
let _ = wake_thread_waker(thread_waker);
}
}
}
}
impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
O: Default + 'static,
> AsyncTask<P, O> {
pub fn new(uid: TaskId,
pool: Arc<P>,
priority: usize,
future: Option<LocalBoxFuture<'static, O>>) -> AsyncTask<P, O> {
AsyncTask {
uid,
future: Mutex::new(future),
pool,
priority,
context: None,
}
}
pub fn with_context<C: 'static>(uid: TaskId,
pool: Arc<P>,
priority: usize,
future: Option<LocalBoxFuture<'static, O>>,
context: C) -> AsyncTask<P, O> {
let any = Box::new(context);
AsyncTask {
uid,
future: Mutex::new(future),
pool,
priority,
context: Some(UnsafeCell::new(any)),
}
}
pub fn with_runtime_and_context<RT, C>(runtime: &RT,
priority: usize,
future: Option<LocalBoxFuture<'static, O>>,
context: C) -> AsyncTask<P, O>
where RT: AsyncRuntime<O, Pool = P>,
C: 'static {
let any = Box::new(context);
AsyncTask {
uid: runtime.alloc::<O>(),
future: Mutex::new(future),
pool: runtime.shared_pool(),
priority,
context: Some(UnsafeCell::new(any)),
}
}
pub fn is_enable_wakeup(&self) -> bool {
self.uid.exist_waker::<O>()
}
pub fn get_inner(&self) -> Option<LocalBoxFuture<'static, O>> {
self.future.lock().take()
}
pub fn set_inner(&self, inner: Option<LocalBoxFuture<'static, O>>) {
*self.future.lock() = inner;
}
#[inline]
pub fn owner(&self) -> usize {
unsafe {
*self.uid.0.get() as usize
}
}
pub fn priority(&self) -> usize {
self.priority
}
pub fn exist_context(&self) -> bool {
self.context.is_some()
}
pub fn get_context<C: 'static>(&self) -> Option<&C> {
if let Some(context) = &self.context {
let any = unsafe { &*context.get() };
return <dyn Any>::downcast_ref::<C>(&**any);
}
None
}
pub fn get_context_mut<C: 'static>(&self) -> Option<&mut C> {
if let Some(context) = &self.context {
let any = unsafe { &mut *context.get() };
return <dyn Any>::downcast_mut::<C>(&mut **any);
}
None
}
pub fn set_context<C: 'static>(&self, new: C) {
if let Some(context) = &self.context {
let _ = unsafe { &*context.get() };
let any: Box<dyn Any + 'static> = Box::new(new);
unsafe { *context.get() = any; }
}
}
pub fn get_pool(&self) -> &P {
self.pool.as_ref()
}
}
pub trait AsyncTaskPool<O: Default + 'static = ()>: Default + 'static {
type Pool: AsyncTaskPoolExt<O> + AsyncTaskPool<O>;
fn get_thread_id(&self) -> usize;
fn len(&self) -> usize;
fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
fn push_priority(&self, priority: usize, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>>;
fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>>;
fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
None
}
}
pub trait AsyncTaskPoolExt<O: Default + 'static = ()>: 'static {
fn set_waits(&mut self,
_waits: Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>) {}
fn get_waits(&self) -> Option<&Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>> {
None
}
fn idler_len(&self) -> usize {
0
}
fn spawn_worker(&self) -> Option<usize> {
None
}
fn worker_len(&self) -> usize {
#[cfg(not(target_arch = "wasm32"))]
return num_cpus::get();
#[cfg(target_arch = "wasm32")]
return 1;
}
fn buffer_len(&self) -> usize {
0
}
fn set_thread_waker(&mut self, _thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) {
}
fn clone_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
None
}
fn close_worker(&self) {
}
}
pub trait AsyncRuntime<O: Default + 'static = ()>: Clone + Send + Sync + 'static {
type Pool: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = Self::Pool>;
fn shared_pool(&self) -> Arc<Self::Pool>;
fn get_id(&self) -> usize;
fn wait_len(&self) -> usize;
fn len(&self) -> usize;
fn alloc<R: 'static>(&self) -> TaskId;
fn spawn<F>(&self, future: F) -> Result<TaskId>
where F: Future<Output = O> + 'static;
fn spawn_local<F>(&self, future: F) -> Result<TaskId>
where F: Future<Output = O> + 'static;
fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
where F: Future<Output = O> + 'static;
fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
where F: Future<Output = O> + 'static;
fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
where F: Future<Output = O> + 'static;
fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where F: Future<Output = O> + 'static;
fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where F: Future<Output = O> + 'static;
fn spawn_priority_by_id<F>(&self,
task_id: TaskId,
priority: usize,
future: F) -> Result<()>
where F: Future<Output = O> + 'static;
fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where F: Future<Output = O> + 'static;
fn spawn_timing_by_id<F>(&self,
task_id: TaskId,
future: F,
time: usize) -> Result<()>
where F: Future<Output = O> + 'static;
fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output>;
fn wakeup<Output: 'static>(&self, task_id: &TaskId);
fn wait<V: 'static>(&self) -> AsyncWait<V>;
fn wait_any<V: 'static>(&self, capacity: usize) -> AsyncWaitAny<V>;
fn wait_any_callback<V: 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V>;
fn map_reduce<V: 'static>(&self, capacity: usize) -> AsyncMapReduce<V>;
fn timeout(&self, timeout: usize) -> LocalBoxFuture<'static, ()>;
fn yield_now(&self) -> LocalBoxFuture<'static, ()>;
fn pipeline<S, SO, F, FO>(&self, input: S, filter: F) -> LocalBoxStream<'static, FO>
where S: Stream<Item = SO> + 'static,
SO: 'static,
F: FnMut(SO) -> AsyncPipelineResult<FO> + 'static,
FO: 'static;
fn close(&self) -> bool;
}
pub trait AsyncRuntimeExt<O: Default + 'static = ()> {
fn spawn_with_context<F, C>(&self,
task_id: TaskId,
future: F,
context: C) -> Result<()>
where F: Future<Output = O> + 'static,
C: 'static;
fn spawn_timing_with_context<F, C>(&self,
task_id: TaskId,
future: F,
context: C,
time: usize) -> Result<()>
where F: Future<Output = O> + 'static,
C: 'static;
fn block_on<F>(&self, future: F) -> Result<F::Output>
where F: Future + 'static,
<F as Future>::Output: Default + 'static;
}
pub struct AsyncRuntimeBuilder<O: Default + 'static = ()>(PhantomData<O>);
impl<O: Default + 'static> AsyncRuntimeBuilder<O> {
pub fn default_local_thread(name: Option<&str>,
stack_size: Option<usize>) -> LocalTaskRuntime<O> {
let runner = LocalTaskRunner::new();
let thread_name = if let Some(name) = name {
name
} else {
"Default-Local-RT"
};
let thread_stack_size = if let Some(size) = stack_size {
size
} else {
2 * 1024 * 1024
};
runner.startup(thread_name, thread_stack_size)
}
pub fn default_worker_thread(worker_name: Option<&str>,
worker_stack_size: Option<usize>,
worker_sleep_timeout: Option<u64>,
worker_loop_interval: Option<Option<u64>>) -> WorkerRuntime<O> {
let runner = WorkerTaskRunner::default();
let thread_name = if let Some(name) = worker_name {
name
} else {
"Default-Single-Worker"
};
let thread_stack_size = if let Some(size) = worker_stack_size {
size
} else {
2 * 1024 * 1024
};
let sleep_timeout = if let Some(timeout) = worker_sleep_timeout {
timeout
} else {
1
};
let loop_interval = if let Some(interval) = worker_loop_interval {
interval
} else {
None
};
let clock = Clock::new();
let runner_copy = runner.clone();
let rt_copy = runner.get_runtime();
let rt = runner.startup(
thread_name,
thread_stack_size,
sleep_timeout,
loop_interval,
move || {
let now = clock.recent();
match runner_copy.run_once() {
Err(e) => {
panic!("Run runner failed, reason: {:?}", e);
},
Ok(len) => {
(len == 0,
clock
.recent()
.duration_since(now))
},
}
},
move || {
rt_copy.wait_len() + rt_copy.len()
},
);
rt
}
#[cfg(not(target_arch = "wasm32"))]
pub fn custom_local_thread(name: Option<&str>,
stack_size: Option<usize>,
poller: Option<Arc<Poller>>,
try_count: Option<usize>,
timeout: Option<Duration>,) -> LocalTaskRuntime<O> {
let poller = if let Some(poller) = poller {
poller
} else {
Arc::new(Poller::new().expect("Failed to create poller"))
};
let runner = LocalTaskRunner::with_poll(poller);
let thread_name = if let Some(name) = name {
name
} else {
"Custom-Local-RT"
};
let thread_stack_size = if let Some(size) = stack_size {
size
} else {
2 * 1024 * 1024
};
let try_count = try_count.unwrap_or(3);
runner.startup_with_poll(
thread_name,
thread_stack_size,
try_count,
timeout
)
}
pub fn custom_worker_thread<P, F0, F1>(pool: P,
worker_handle: Arc<AtomicBool>,
worker_condvar: Arc<(AtomicBool, Mutex<()>, Condvar)>,
thread_name: &str,
thread_stack_size: usize,
sleep_timeout: u64,
loop_interval: Option<u64>,
loop_func: F0,
get_queue_len: F1) -> WorkerRuntime<O, P>
where P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
F0: Fn() -> (bool, Duration) + Send + 'static,
F1: Fn() -> usize + Send + 'static {
let runner = WorkerTaskRunner::new(pool,
worker_handle,
worker_condvar);
let rt_copy = runner.get_runtime();
let rt = runner.startup(
thread_name,
thread_stack_size,
sleep_timeout,
loop_interval,
loop_func,
move || {
rt_copy.wait_len() + get_queue_len()
},
);
rt
}
}
pub fn bind_local_thread<O: Default + 'static>(runtime: LocalAsyncRuntime<O>) {
match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
let raw = Arc::into_raw(Arc::new(runtime)) as *mut LocalAsyncRuntime<O> as *mut ();
rt.store(raw, Ordering::Relaxed);
}) {
Err(e) => {
panic!("Bind single runtime to local thread failed, reason: {:?}", e);
},
Ok(_) => (),
}
}
pub fn unbind_local_thread() {
let _ = PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
rt.store(null_mut(), Ordering::Relaxed);
});
}
pub struct LocalAsyncRuntime<O: Default + 'static> {
inner: *const (), get_id_func: fn(*const ()) -> usize, spawn_func: fn(*const (), LocalBoxFuture<'static, O>) -> Result<()>, spawn_local_func: fn(*const (), LocalBoxFuture<'static, O>) -> Result<()>, spawn_timing_func: fn(*const (), LocalBoxFuture<'static, O>, usize) -> Result<()>, timeout_func: fn(*const (), usize) -> LocalBoxFuture<'static, ()>, }
unsafe impl<O: Default + 'static> Send for LocalAsyncRuntime<O> {}
unsafe impl<O: Default + 'static> Sync for LocalAsyncRuntime<O> {}
impl<O: Default + 'static> LocalAsyncRuntime<O> {
pub fn new(inner: *const (),
get_id_func: fn(*const ()) -> usize,
spawn_func: fn(*const (), LocalBoxFuture<'static, O>) -> Result<()>,
spawn_timing_func: fn(*const (), LocalBoxFuture<'static, O>, usize) -> Result<()>,
timeout_func: fn(*const (), usize) -> LocalBoxFuture<'static, ()>) -> Self {
LocalAsyncRuntime {
inner,
get_id_func,
spawn_func,
spawn_local_func: spawn_func,
spawn_timing_func,
timeout_func,
}
}
#[inline]
pub fn get_id(&self) -> usize {
(self.get_id_func)(self.inner)
}
#[inline]
pub fn spawn<F>(&self, future: F) -> Result<()>
where F: Future<Output = O> + 'static {
(self.spawn_func)(self.inner, async move {
future.await
}.boxed_local())
}
#[inline]
pub fn spawn_local<F>(&self, future: F) -> Result<()>
where F: Future<Output = O> + 'static {
(self.spawn_local_func)(self.inner, async move {
future.await
}.boxed_local())
}
#[inline]
pub fn sapwn_timing_func<F>(&self, future: F, timeout: usize) -> Result<()>
where F: Future<Output = O> + 'static {
(self.spawn_timing_func)(self.inner,
async move {
future.await
}.boxed_local(),
timeout)
}
#[inline]
pub fn timeout(&self, timeout: usize) -> LocalBoxFuture<'static, ()> {
(self.timeout_func)(self.inner, timeout)
}
}
pub fn local_serial_async_runtime<O: Default + 'static>() -> Option<Arc<LocalAsyncRuntime<O>>> {
match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |ptr| {
let raw = ptr.load(Ordering::Relaxed) as *const LocalAsyncRuntime<O>;
unsafe {
if raw.is_null() {
None
} else {
let shared: Arc<LocalAsyncRuntime<O>> = unsafe { Arc::from_raw(raw) };
let result = shared.clone();
Arc::into_raw(shared); Some(result)
}
}
}) {
Err(_) => None, Ok(rt) => rt,
}
}
pub fn spawn_local<O, F>(future: F) -> Result<()>
where O: Default + 'static,
F: Future<Output = O> + 'static {
if let Some(rt) = local_serial_async_runtime::<O>() {
rt.spawn(future)
} else {
Err(Error::new(ErrorKind::Other, format!("Spawn task to local thread failed, reason: runtime not exist")))
}
}
pub fn local_async_runtime<O: Default + 'static>() -> Option<Arc<LocalAsyncRuntime<O>>> {
match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |ptr| {
let raw = ptr.load(Ordering::Relaxed) as *const LocalAsyncRuntime<O>;
unsafe {
if raw.is_null() {
None
} else {
let shared: Arc<LocalAsyncRuntime<O>> = unsafe { Arc::from_raw(raw) };
let result = shared.clone();
Arc::into_raw(shared); Some(result)
}
}
}) {
Err(_) => None, Ok(rt) => rt,
}
}
pub struct AsyncValue<V: 'static>(Arc<InnerAsyncValue<V>>);
unsafe impl<V: 'static> Send for AsyncValue<V> {}
unsafe impl<V: 'static> Sync for AsyncValue<V> {}
impl<V: 'static> Clone for AsyncValue<V> {
fn clone(&self) -> Self {
AsyncValue(self.0.clone())
}
}
impl<V: Send + 'static> Debug for AsyncValue<V> {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f,
"AsyncValue[status = {}]",
self.0.status.load(Ordering::Acquire))
}
}
impl<V: 'static> Future for AsyncValue<V> {
type Output = V;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut spin_len = 1;
while self.0.status.load(Ordering::Acquire) == 2 {
spin_len = spin(spin_len);
}
if self.0.status.load(Ordering::Acquire) == 3 {
if let Some(value) = unsafe { (*(&self).0.value.get()).take() } {
return Poll::Ready(value);
}
}
unsafe {
*self.0.waker.get() = Some(cx.waker().clone()); }
let mut spin_len = 1;
loop {
match self.0.status.compare_exchange(0,
1, Ordering::Acquire,
Ordering::Relaxed) {
Err(2) => {
spin_len = spin(spin_len);
continue;
},
Err(3) => {
let value = unsafe { (*(&self).0.value.get()).take().unwrap() };
return Poll::Ready(value);
},
Err(_) => {
unimplemented!();
},
Ok(_) => {
return Poll::Pending;
},
}
}
}
}
impl<V: 'static> AsyncValue<V> {
pub fn new() -> Self {
let inner = InnerAsyncValue {
value: UnsafeCell::new(None),
waker: UnsafeCell::new(None),
status: AtomicU8::new(0),
};
AsyncValue(Arc::new(inner))
}
pub fn is_complete(&self) -> bool {
self
.0
.status
.load(Ordering::Relaxed) == 3
}
pub fn set(self, value: V) {
loop {
match self.0.status.compare_exchange(1,
2,
Ordering::Acquire,
Ordering::Relaxed) {
Err(0) => {
match self.0.status.compare_exchange(0,
2,
Ordering::Acquire,
Ordering::Relaxed) {
Err(1) => {
continue;
},
Err(_) => {
return;
},
Ok(_) => {
unsafe { *self.0.value.get() = Some(value); }
self.0.status.store(3, Ordering::Release);
return;
}
}
},
Err(_) => {
return;
},
Ok(_) => {
break;
}
}
}
unsafe { *self.0.value.get() = Some(value); }
self.0.status.store(3, Ordering::Release);
let waker = unsafe { (*self.0.waker.get()).take().unwrap() };
waker.wake();
}
}
pub struct InnerAsyncValue<V: 'static> {
value: UnsafeCell<Option<V>>, waker: UnsafeCell<Option<Waker>>, status: AtomicU8, }
pub struct AsyncVariableGuard<'a, V: 'static> {
value: &'a UnsafeCell<Option<V>>, waker: &'a UnsafeCell<Option<Waker>>, status: &'a AtomicU8, }
unsafe impl<V: 'static> Send for AsyncVariableGuard<'_, V> {}
impl<V: 'static> Drop for AsyncVariableGuard<'_, V> {
fn drop(&mut self) {
self.status.fetch_sub(2, Ordering::Relaxed);
}
}
impl<V: 'static> Deref for AsyncVariableGuard<'_, V> {
type Target = Option<V>;
fn deref(&self) -> &Self::Target {
unsafe {
&*self.value.get()
}
}
}
impl<V: 'static> DerefMut for AsyncVariableGuard<'_, V> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe {
&mut *self.value.get()
}
}
}
impl<V: 'static> AsyncVariableGuard<'_, V> {
pub fn finish(self) {
if self.status.fetch_add(4, Ordering::Relaxed) == 3 {
if let Some(waker) = unsafe { (&mut *self.waker.get()).take() } {
waker.wake();
}
}
}
}
pub struct AsyncVariable<V: 'static>(Arc<InnerAsyncVariable<V>>);
unsafe impl<V: 'static> Send for AsyncVariable<V> {}
unsafe impl<V: 'static> Sync for AsyncVariable<V> {}
impl<V: 'static> Clone for AsyncVariable<V> {
fn clone(&self) -> Self {
AsyncVariable(self.0.clone())
}
}
impl<V: 'static> Future for AsyncVariable<V> {
type Output = V;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe {
*self.0.waker.get() = Some(cx.waker().clone()); }
let mut spin_len = 1;
loop {
match self.0.status.compare_exchange(0,
1,
Ordering::Acquire,
Ordering::Relaxed) {
Err(current) if current & 4 != 0 => {
unsafe {
let _ = (&mut *self.0.waker.get()).take(); return Poll::Ready((&mut *(&self).0.value.get()).take().unwrap());
}
},
Err(_) => {
spin_len = spin(spin_len);
},
Ok(_) => {
return Poll::Pending;
},
}
}
}
}
impl<V: 'static> AsyncVariable<V> {
pub fn new() -> Self {
let inner = InnerAsyncVariable {
value: UnsafeCell::new(None),
waker: UnsafeCell::new(None),
status: AtomicU8::new(0),
};
AsyncVariable(Arc::new(inner))
}
pub fn is_complete(&self) -> bool {
self
.0
.status
.load(Ordering::Acquire) & 4 != 0
}
pub fn lock(&self) -> Option<AsyncVariableGuard<V>> {
let mut spin_len = 1;
loop {
match self
.0
.status
.compare_exchange(1,
3,
Ordering::Acquire,
Ordering::Relaxed) {
Err(0) => {
match self
.0
.status
.compare_exchange(0,
2,
Ordering::Acquire,
Ordering::Relaxed) {
Err(1) => {
continue;
},
Err(2) => {
spin_len = spin(spin_len);
},
Err(3) => {
spin_len = spin(spin_len);
},
Err(_) => {
return None;
},
Ok(_) => {
let guard = AsyncVariableGuard {
value: &self.0.value,
waker: &self.0.waker,
status: &self.0.status,
};
return Some(guard)
},
}
},
Err(2) => {
spin_len = spin(spin_len);
},
Err(3) => {
spin_len = spin(spin_len);
},
Err(_) => {
return None;
}
Ok(_) => {
let guard = AsyncVariableGuard {
value: &self.0.value,
waker: &self.0.waker,
status: &self.0.status,
};
return Some(guard)
},
}
}
}
}
pub struct InnerAsyncVariable<V: 'static> {
value: UnsafeCell<Option<V>>, waker: UnsafeCell<Option<Waker>>, status: AtomicU8, }
pub struct AsyncWaitResult<V: 'static>(pub Arc<RefCell<Option<Result<V>>>>);
unsafe impl<V: 'static> Send for AsyncWaitResult<V> {}
unsafe impl<V: 'static> Sync for AsyncWaitResult<V> {}
impl<V: 'static> Clone for AsyncWaitResult<V> {
fn clone(&self) -> Self {
AsyncWaitResult(self.0.clone())
}
}
pub struct AsyncWaitResults<V: 'static>(pub Arc<RefCell<Option<Vec<Result<V>>>>>);
unsafe impl<V: 'static> Send for AsyncWaitResults<V> {}
unsafe impl<V: 'static> Sync for AsyncWaitResults<V> {}
impl<V: 'static> Clone for AsyncWaitResults<V> {
fn clone(&self) -> Self {
AsyncWaitResults(self.0.clone())
}
}
pub enum AsyncTimingTask<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static = (),
> {
Pended(TaskId), WaitRun(Arc<AsyncTask<P, O>>), TimeoutWake(Arc<TimeoutWaiter>), }
pub struct AsyncTaskTimer<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static = (),
> {
producor: Sender<(usize, AsyncTimingTask<P, O>)>, consumer: Receiver<(usize, AsyncTimingTask<P, O>)>, timer: Arc<RefCell<Timer<AsyncTimingTask<P, O>, 1000, 60, 3>>>, clock: Clock, now: QInstant, }
unsafe impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Send for AsyncTaskTimer<P, O> {}
unsafe impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Sync for AsyncTaskTimer<P, O> {}
impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> AsyncTaskTimer<P, O> {
pub fn new() -> Self {
let (producor, consumer) = unbounded();
let clock = Clock::new();
let now = clock.recent();
AsyncTaskTimer {
producor,
consumer,
timer: Arc::new(RefCell::new(Timer::<AsyncTimingTask<P, O>, 1000, 60, 3>::default())),
clock,
now,
}
}
#[inline]
pub fn get_producor(&self) -> &Sender<(usize, AsyncTimingTask<P, O>)> {
&self.producor
}
#[inline]
pub fn len(&self) -> usize {
let timer = self.timer.as_ref().borrow();
timer.add_count() - timer.remove_count()
}
pub fn set_timer(&self, task: AsyncTimingTask<P, O>, timeout: usize) -> usize {
let current_time = self
.clock
.recent()
.duration_since(self.now)
.as_millis() as u64;
self
.timer
.borrow_mut()
.push_time(current_time + timeout as u64, task)
.data()
.as_ffi() as usize
}
pub fn cancel_timer(&self, timer_ref: usize) -> Option<AsyncTimingTask<P, O>> {
if let Some(item) =self
.timer
.borrow_mut()
.cancel(KeyData::from_ffi(timer_ref as u64).into()) {
Some(item)
} else {
None
}
}
pub fn consume(&self) -> usize {
let mut len = 0;
let timer_tasks = self.consumer.try_iter().collect::<Vec<(usize, AsyncTimingTask<P, O>)>>();
for (timeout, task) in timer_tasks {
self.set_timer(task, timeout);
len += 1;
}
len
}
pub fn is_require_pop(&self) -> Option<u64> {
let current_time = self
.clock
.recent()
.duration_since(self.now)
.as_millis() as u64;
if self.timer.borrow_mut().is_ok(current_time) {
Some(current_time)
} else {
None
}
}
pub fn pop(&self, current_time: u64) -> Option<(usize, AsyncTimingTask<P, O>)> {
if let Some((key, item)) = self.timer.borrow_mut().pop_kv(current_time) {
Some((key.data().as_ffi() as usize, item))
} else {
None
}
}
}
pub struct AsyncWaitTimeout<
RT: AsyncRuntime<O>,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static = (),
> {
rt: RT, producor: Sender<(usize, AsyncTimingTask<P, O>)>, timeout: usize, registered: AtomicBool, waiter: Arc<TimeoutWaiter>, }
unsafe impl<
RT: AsyncRuntime<O>,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Send for AsyncWaitTimeout<RT, P, O> {}
unsafe impl<
RT: AsyncRuntime<O>,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Sync for AsyncWaitTimeout<RT, P, O> {}
impl<
RT: AsyncRuntime<O>,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Future for AsyncWaitTimeout<RT, P, O> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.waiter.is_fired() {
return Poll::Ready(());
}
self.waiter.register(cx.waker());
if !self.registered.swap(true, Ordering::AcqRel) {
let _ = self
.producor
.send((self.timeout, AsyncTimingTask::TimeoutWake(self.waiter.clone())));
}
if self.waiter.is_fired() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
impl<
RT: AsyncRuntime<O>,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Drop for AsyncWaitTimeout<RT, P, O> {
fn drop(&mut self) {
self.waiter.clear_waker();
}
}
impl<
RT: AsyncRuntime<O>,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> AsyncWaitTimeout<RT, P, O> {
pub fn new(rt: RT,
producor: Sender<(usize, AsyncTimingTask<P, O>)>,
timeout: usize) -> Self {
AsyncWaitTimeout {
rt,
producor,
timeout,
registered: AtomicBool::new(false), waiter: Arc::new(TimeoutWaiter::new()),
}
}
}
pub struct AsyncWait<V: 'static>(AsyncWaitAny<V>);
unsafe impl<V: 'static> Send for AsyncWait<V> {}
unsafe impl<V: 'static> Sync for AsyncWait<V> {}
impl<V: 'static> AsyncWait<V> {
pub(crate) fn new(inner: AsyncWaitAny<V>) -> Self {
AsyncWait(inner)
}
pub fn spawn<RT, O, F>(&self,
rt: RT,
timeout: Option<usize>,
future: F) -> Result<()>
where RT: AsyncRuntime<O>,
O: Default + 'static,
F: Future<Output = Result<V>> + 'static {
self.0.spawn(rt.clone(), future)?;
if let Some(timeout) = timeout {
let rt_copy = rt.clone();
self.0.spawn(rt, async move {
rt_copy.timeout(timeout).await;
Err(Error::new(ErrorKind::TimedOut, format!("Time out")))
})
} else {
Ok(())
}
}
pub fn spawn_local<O, F>(&self,
timeout: Option<usize>,
future: F) -> Result<()>
where O: Default + 'static,
F: Future<Output = Result<V>> + 'static {
if let Some(rt) = local_serial_async_runtime::<O>() {
self.0.spawn_local(future)?;
if let Some(timeout) = timeout {
let rt_copy = rt.clone();
self.0.spawn_local(async move {
rt_copy.timeout(timeout).await;
Err(Error::new(ErrorKind::TimedOut, format!("Time out")))
})
} else {
Ok(())
}
} else {
Err(Error::new(ErrorKind::Other, format!("Spawn wait task failed, reason: local async runtime not exist")))
}
}
}
impl<V: 'static> AsyncWait<V> {
pub async fn wait_result(self) -> Result<V> {
self.0.wait_result().await
}
}
pub struct AsyncWaitAny<V: 'static> {
capacity: usize, producor: AsyncSender<Result<V>>, consumer: AsyncReceiver<Result<V>>, }
unsafe impl<V: 'static> Send for AsyncWaitAny<V> {}
unsafe impl<V: 'static> Sync for AsyncWaitAny<V> {}
impl<V: 'static> AsyncWaitAny<V> {
pub(crate) fn new(capacity: usize,
producor: AsyncSender<Result<V>>,
consumer: AsyncReceiver<Result<V>>) -> Self {
AsyncWaitAny {
capacity,
producor,
consumer,
}
}
pub fn spawn<RT, O, F>(&self,
rt: RT,
future: F) -> Result<()>
where RT: AsyncRuntime<O>,
O: Default + 'static,
F: Future<Output = Result<V>> + 'static {
let producor = self.producor.clone();
rt.spawn_by_id(rt.alloc::<O>(), async move {
let value = future.await;
producor.into_send_async(value).await;
Default::default()
})
}
pub fn spawn_local<F>(&self,
future: F) -> Result<()>
where F: Future<Output = Result<V>> + 'static {
if let Some(rt) = local_serial_async_runtime() {
let producor = self.producor.clone();
rt.spawn(async move {
let value = future.await;
producor.into_send_async(value).await;
})
} else {
Err(Error::new(ErrorKind::Other, format!("Spawn wait any task failed, reason: local async runtime not exist")))
}
}
}
impl<V: 'static> AsyncWaitAny<V> {
pub async fn wait_result(self) -> Result<V> {
match self.consumer.recv_async().await {
Err(e) => {
Err(Error::new(ErrorKind::Other, format!("Wait any result failed, reason: {:?}", e)))
},
Ok(result) => {
result
},
}
}
}
pub struct AsyncWaitAnyCallback<V: 'static> {
capacity: usize, producor: AsyncSender<Result<V>>, consumer: AsyncReceiver<Result<V>>, }
unsafe impl<V: 'static> Send for AsyncWaitAnyCallback<V> {}
unsafe impl<V: 'static> Sync for AsyncWaitAnyCallback<V> {}
impl<V: 'static> AsyncWaitAnyCallback<V> {
pub(crate) fn new(capacity: usize,
producor: AsyncSender<Result<V>>,
consumer: AsyncReceiver<Result<V>>) -> Self {
AsyncWaitAnyCallback {
capacity,
producor,
consumer,
}
}
pub fn spawn<RT, O, F>(&self,
rt: RT,
future: F) -> Result<()>
where RT: AsyncRuntime<O>,
O: Default + 'static,
F: Future<Output = Result<V>> + 'static {
let producor = self.producor.clone();
rt.spawn_by_id(rt.alloc::<O>(), async move {
let value = future.await;
producor.into_send_async(value).await;
Default::default()
})
}
pub fn spawn_local<F>(&self,
future: F) -> Result<()>
where F: Future<Output = Result<V>> + 'static {
if let Some(rt) = local_serial_async_runtime() {
let producor = self.producor.clone();
rt.spawn(async move {
let value = future.await;
producor.into_send_async(value).await;
})
} else {
Err(Error::new(ErrorKind::Other, format!("Spawn wait any task failed by callback, reason: current async runtime not exist")))
}
}
}
impl<V: 'static> AsyncWaitAnyCallback<V> {
pub async fn wait_result(mut self,
callback: impl Fn(&Result<V>) -> bool + 'static) -> Result<V> {
let checker = create_checker(self.capacity, callback);
loop {
match self.consumer.recv_async().await {
Err(e) => {
return Err(Error::new(ErrorKind::Other, format!("Wait any result failed by callback, reason: {:?}", e)));
},
Ok(result) => {
if checker(&result) {
return result;
}
},
}
}
}
}
fn create_checker<V, F>(len: usize,
callback: F) -> Arc<dyn Fn(&Result<V>) -> bool + 'static>
where V: 'static,
F: Fn(&Result<V>) -> bool + 'static {
let mut check_counter = AtomicUsize::new(len); Arc::new(move |result| {
if check_counter.fetch_sub(1, Ordering::SeqCst) == 1 {
true
} else {
callback(result)
}
})
}
pub struct AsyncMapReduce<V: 'static> {
count: usize, capacity: usize, producor: AsyncSender<(usize, Result<V>)>, consumer: AsyncReceiver<(usize, Result<V>)>, }
unsafe impl<V: 'static> Send for AsyncMapReduce<V> {}
impl<V: 'static> AsyncMapReduce<V> {
pub(crate) fn new(count: usize,
capacity: usize,
producor: AsyncSender<(usize, Result<V>)>,
consumer: AsyncReceiver<(usize, Result<V>)>) -> Self {
AsyncMapReduce {
count,
capacity,
producor,
consumer,
}
}
pub fn map<RT, O, F>(&mut self, rt: RT, future: F) -> Result<usize>
where RT: AsyncRuntime<O>,
O: Default + 'static,
F: Future<Output = Result<V>> + 'static {
if self.count >= self.capacity {
return Err(Error::new(ErrorKind::Other, format!("Map task to runtime failed, capacity: {}, reason: out of capacity", self.capacity)));
}
let index = self.count;
let producor = self.producor.clone();
rt.spawn(async move {
let value = future.await;
producor.into_send_async((index, value)).await;
Default::default()
})?;
self.count += 1; Ok(index)
}
}
impl<V: 'static> AsyncMapReduce<V> {
pub async fn reduce(self, order: bool) -> Result<Vec<Result<V>>> {
let mut count = self.count;
let mut results = Vec::with_capacity(count);
while count > 0 {
match self.consumer.recv_async().await {
Err(e) => {
return Err(Error::new(ErrorKind::Other, format!("Reduce result failed, reason: {:?}", e)));
},
Ok((index, result)) => {
results.push((index, result));
count -= 1;
},
}
}
if order {
results.sort_by_key(|(key, _value)| {
key.clone()
});
}
let (_, values) = results
.into_iter()
.unzip::<usize, Result<V>, Vec<usize>, Vec<Result<V>>>();
Ok(values)
}
}
pub fn spawn_worker_thread<F0, F1>(thread_name: &str,
thread_stack_size: usize,
thread_handler: Arc<AtomicBool>,
thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>, sleep_timeout: u64, loop_interval: Option<u64>, loop_func: F0,
get_queue_len: F1) -> Arc<AtomicBool>
where F0: Fn() -> (bool, Duration) + Send + 'static,
F1: Fn() -> usize + Send + 'static {
let thread_status_copy = thread_handler.clone();
thread::Builder::new()
.name(thread_name.to_string())
.stack_size(thread_stack_size)
.spawn(move || {
let mut sleep_count = 0;
while thread_handler.load(Ordering::Relaxed) {
let (is_no_task, run_time) = loop_func();
if is_no_task {
if sleep_count > 1 {
sleep_count = 0; let (is_sleep, lock, condvar) = &*thread_waker;
if get_queue_len() > 0 {
continue;
}
{
let _locked = lock.lock();
if !is_sleep.load(Ordering::Acquire) {
is_sleep.store(true, Ordering::Release);
}
}
if get_queue_len() > 0 {
is_sleep.store(false, Ordering::Release);
continue;
}
let mut locked = lock.lock();
if is_sleep.load(Ordering::Acquire) {
let _ = condvar.wait_for(
&mut locked,
Duration::from_millis(sleep_timeout),
);
}
is_sleep.store(false, Ordering::Release);
continue; }
sleep_count += 1; if let Some(interval) = &loop_interval {
if let Some(remaining_interval) = Duration::from_millis(*interval).checked_sub(run_time){
thread::sleep(remaining_interval);
}
}
} else {
sleep_count = 0; if let Some(interval) = &loop_interval {
if let Some(remaining_interval) = Duration::from_millis(*interval).checked_sub(run_time){
thread::sleep(remaining_interval);
}
}
}
}
});
thread_status_copy
}
pub fn wakeup_worker_thread<O, P>(worker_waker: &Arc<(AtomicBool, Mutex<()>, Condvar)>,
rt: &SingleTaskRuntime<O, P>)
where O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P> {
if worker_waker.0.load(Ordering::Relaxed) && rt.len() > 0 {
let _ = wake_thread_waker(worker_waker);
}
}