use std::thread;
use std::pin::Pin;
use std::sync::Arc;
use std::ptr::null_mut;
use std::vec::IntoIter;
use std::future::Future;
use std::panic::set_hook;
use std::any::{Any, TypeId};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::cell::{RefCell, UnsafeCell};
use std::task::{Waker, Context, Poll};
use std::time::{Duration, SystemTime};
use std::io::{Error, Result, ErrorKind};
use std::alloc::{Layout, set_alloc_error_hook};
use std::fmt::{Debug, Formatter, Result as FmtResult};
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, AtomicPtr, Ordering};
pub mod single_thread;
pub mod multi_thread;
pub mod worker_thread;
pub mod serial;
pub mod serial_local_thread;
pub mod serial_single_thread;
pub mod serial_worker_thread;
pub mod serial_local_compatible_wasm_runtime;
use libc;
use futures::{future::{FutureExt, BoxFuture},
stream::{Stream, BoxStream},
task::ArcWake};
use parking_lot::{Mutex, Condvar};
use crossbeam_channel::{Sender, Receiver, unbounded};
use crossbeam_queue::ArrayQueue;
use crossbeam_utils::atomic::AtomicCell;
use flume::{Sender as AsyncSender, Receiver as AsyncReceiver};
use num_cpus;
use backtrace::Backtrace;
use slotmap::{Key, KeyData};
use quanta::{Clock, Upkeep, Handle, Instant as QInstant};
use pi_hash::XHashMap;
use pi_cancel_timer::Timer;
use pi_timer::Timer as NotCancelTimer;
use single_thread::SingleTaskRuntime;
use worker_thread::{WorkerTaskRunner, WorkerRuntime};
use multi_thread::{MultiTaskRuntimeBuilder, MultiTaskRuntime};
use crate::lock::spin;
thread_local! {
static PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME: AtomicPtr<()> = AtomicPtr::new(null_mut());
static PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT: UnsafeCell<XHashMap<TypeId, Box<dyn Any + 'static>>> = UnsafeCell::new(XHashMap::default());
}
thread_local! {
static PI_ASYNC_THREAD_LOCAL_ID: UnsafeCell<usize> = UnsafeCell::new(usize::MAX);
}
const DEFAULT_MAX_HIGH_PRIORITY_BOUNDED: usize = 10;
const DEFAULT_HIGH_PRIORITY_BOUNDED: usize = 5;
const DEFAULT_MAX_LOW_PRIORITY_BOUNDED: usize = 0;
static RUNTIME_UID_GEN: AtomicUsize = AtomicUsize::new(1);
static GLOBAL_TIME_LOOP_STATUS: AtomicBool = AtomicBool::new(false);
pub fn startup_global_time_loop(interval: u64) -> Option<GlobalTimeLoopHandle> {
if let Err(_) = GLOBAL_TIME_LOOP_STATUS.compare_exchange(false,
true,
Ordering::AcqRel,
Ordering::Relaxed) {
None
} else {
let timer = Upkeep::new_with_clock(Duration::from_millis(interval), Clock::new());
let handle = timer.start().unwrap();
let clock = Clock::new();
let _now = clock.recent();
Some(GlobalTimeLoopHandle(handle))
}
}
pub struct GlobalTimeLoopHandle(Handle);
impl Drop for GlobalTimeLoopHandle {
fn drop(&mut self) {
GLOBAL_TIME_LOOP_STATUS.store(false, Ordering::Release);
}
}
pub fn alloc_rt_uid() -> usize {
RUNTIME_UID_GEN.fetch_add(1, Ordering::Relaxed)
}
pub struct TaskId(UnsafeCell<u128>);
impl Debug for TaskId {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(f, "TaskId[inner = {}]", unsafe { *self.0.get() })
}
}
impl Clone for TaskId {
fn clone(&self) -> Self {
unsafe {
TaskId(UnsafeCell::new(*self.0.get()))
}
}
}
impl TaskId {
#[inline]
pub fn exist_waker<R: 'static>(&self) -> bool {
unsafe {
let handle = unsafe { TaskHandle::<R>::from_raw((*self.0.get() >> 64) as *const ()) };
let inner = &*handle.0;
let r = if let Some(waker) = inner.0.swap(None) {
inner.0.swap(Some(waker));
true
} else {
false
};
handle.into_raw();
r
}
}
#[inline]
pub fn wakeup<R: 'static>(&self) {
unsafe {
let handle = unsafe { TaskHandle::<R>::from_raw((*self.0.get() >> 64) as *const ()) };
let inner = &*handle.0;
if let Some(waker) = inner.0.swap(None) {
waker.wake();
}
handle.into_raw();
}
}
#[inline]
pub fn set_waker<R: 'static>(&self, waker: Waker) -> Option<Waker> {
unsafe {
let handle = unsafe { TaskHandle::<R>::from_raw((*self.0.get() >> 64) as *const ()) };
let inner = &*handle.0;
let r = inner.0.swap(Some(waker));
handle.into_raw();
r
}
}
#[inline]
pub fn result<R: 'static>(&self) -> Option<R> {
unsafe {
let handle = unsafe { TaskHandle::<R>::from_raw((*self.0.get() >> 64) as *const ()) };
let inner = &*handle.0;
let r = inner.1.swap(None);
handle.into_raw();
r
}
}
#[inline]
pub fn set_result<R: 'static>(&self, result: R) -> Option<R> {
unsafe {
let handle = unsafe { TaskHandle::<R>::from_raw((*self.0.get() >> 64) as *const ()) };
let inner = &*handle.0;
let r = inner.1.swap(Some(result));
handle.into_raw();
r
}
}
}
pub(crate) struct TaskHandle<R: 'static>(Box<(
AtomicCell<Option<Waker>>, AtomicCell<Option<R>>, )>);
impl<R: 'static> Default for TaskHandle<R> {
fn default() -> Self {
TaskHandle(Box::new((AtomicCell::new(None), AtomicCell::new(None))))
}
}
impl<R: 'static> TaskHandle<R> {
pub unsafe fn from_raw(raw: *const ()) -> TaskHandle<R> {
let inner
= Box::from_raw(raw as *const (AtomicCell<Option<Waker>>, AtomicCell<Option<R>>) as *mut (AtomicCell<Option<Waker>>, AtomicCell<Option<R>>));
TaskHandle(inner)
}
pub fn into_raw(self) -> *const () {
Box::into_raw(self.0)
as *mut (AtomicCell<Option<Waker>>, AtomicCell<Option<R>>)
as *const (AtomicCell<Option<Waker>>, AtomicCell<Option<R>>)
as *const ()
}
}
pub struct AsyncTask<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static = (),
> {
uid: TaskId, future: Mutex<Option<BoxFuture<'static, O>>>, pool: Arc<P>, priority: usize, context: Option<UnsafeCell<Box<dyn Any>>>, }
impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Drop for AsyncTask<P, O> {
fn drop(&mut self) {
let _ = unsafe { TaskHandle::<O>::from_raw((*self.uid.0.get() >> 64) as usize as *const ()) };
}
}
unsafe impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Send for AsyncTask<P, O> {}
unsafe impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Sync for AsyncTask<P, O> {}
impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
O: Default + 'static,
> ArcWake for AsyncTask<P, O> {
#[cfg(not(target_arch = "aarch64"))]
fn wake_by_ref(arc_self: &Arc<Self>) {
let pool = arc_self.get_pool();
let _ = pool.push_keep(arc_self.clone());
if let Some(waits) = pool.get_waits() {
if let Some(worker_waker) = waits.pop() {
let (is_sleep, lock, condvar) = &*worker_waker;
let _locked = lock.lock();
if is_sleep.load(Ordering::Relaxed) {
if let Ok(true) = is_sleep
.compare_exchange_weak(true,
false,
Ordering::SeqCst,
Ordering::SeqCst) {
condvar.notify_one();
}
}
}
} else {
if let Some(thread_waker) = pool.get_thread_waker() {
if thread_waker.0.load(Ordering::Relaxed) {
let (is_sleep, lock, condvar) = &**thread_waker;
let _locked = lock.lock();
if let Ok(true) = is_sleep
.compare_exchange_weak(true,
false,
Ordering::SeqCst,
Ordering::SeqCst) {
condvar.notify_one();
}
}
}
}
}
#[cfg(target_arch = "aarch64")]
fn wake_by_ref(arc_self: &Arc<Self>) {
let pool = arc_self.get_pool();
let _ = pool.push_keep(arc_self.clone());
if let Some(waits) = pool.get_waits() {
if let Some(worker_waker) = waits.pop() {
let (is_sleep, lock, condvar) = &*worker_waker;
let locked = lock.lock();
if is_sleep.load(Ordering::Relaxed) {
if let Ok(true) = is_sleep
.compare_exchange(true,
false,
Ordering::SeqCst,
Ordering::SeqCst) {
condvar.notify_one();
}
}
}
} else {
if let Some(thread_waker) = pool.get_thread_waker() {
if thread_waker.0.load(Ordering::Relaxed) {
let (is_sleep, lock, condvar) = &**thread_waker;
let locked = lock.lock();
if let Ok(true) = is_sleep
.compare_exchange(true,
false,
Ordering::SeqCst,
Ordering::SeqCst) {
condvar.notify_one();
}
}
}
}
}
}
impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
O: Default + 'static,
> AsyncTask<P, O> {
pub fn new(uid: TaskId,
pool: Arc<P>,
priority: usize,
future: Option<BoxFuture<'static, O>>) -> AsyncTask<P, O> {
AsyncTask {
uid,
future: Mutex::new(future),
pool,
priority,
context: None,
}
}
pub fn with_context<C: 'static>(uid: TaskId,
pool: Arc<P>,
priority: usize,
future: Option<BoxFuture<'static, O>>,
context: C) -> AsyncTask<P, O> {
let any = Box::new(context);
AsyncTask {
uid,
future: Mutex::new(future),
pool,
priority,
context: Some(UnsafeCell::new(any)),
}
}
pub fn with_runtime_and_context<RT, C>(runtime: &RT,
priority: usize,
future: Option<BoxFuture<'static, O>>,
context: C) -> AsyncTask<P, O>
where RT: AsyncRuntime<O, Pool = P>,
C: Send + 'static {
let any = Box::new(context);
AsyncTask {
uid: runtime.alloc::<O>(),
future: Mutex::new(future),
pool: runtime.shared_pool(),
priority,
context: Some(UnsafeCell::new(any)),
}
}
pub fn is_enable_wakeup(&self) -> bool {
self.uid.exist_waker::<O>()
}
pub fn get_inner(&self) -> Option<BoxFuture<'static, O>> {
self.future.lock().take()
}
pub fn set_inner(&self, inner: Option<BoxFuture<'static, O>>) {
*self.future.lock() = inner;
}
#[inline]
pub fn owner(&self) -> usize {
unsafe {
*self.uid.0.get() as usize
}
}
#[inline]
pub fn priority(&self) -> usize {
self.priority
}
pub fn exist_context(&self) -> bool {
self.context.is_some()
}
pub fn get_context<C: Send + 'static>(&self) -> Option<&C> {
if let Some(context) = &self.context {
let any = unsafe { &*context.get() };
return <dyn Any>::downcast_ref::<C>(&**any);
}
None
}
pub fn get_context_mut<C: Send + 'static>(&self) -> Option<&mut C> {
if let Some(context) = &self.context {
let any = unsafe { &mut *context.get() };
return <dyn Any>::downcast_mut::<C>(&mut **any);
}
None
}
pub fn set_context<C: Send + 'static>(&self, new: C) {
if let Some(context) = &self.context {
let _ = unsafe { &*context.get() };
let any: Box<dyn Any + 'static> = Box::new(new);
unsafe { *context.get() = any; }
}
}
pub fn get_pool(&self) -> &P {
self.pool.as_ref()
}
}
pub trait AsyncTaskPool<O: Default + 'static = ()>: Default + Send + Sync + 'static {
type Pool: AsyncTaskPoolExt<O> + AsyncTaskPool<O>;
fn get_thread_id(&self) -> usize;
fn len(&self) -> usize;
fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
fn push_priority(&self,
priority: usize,
task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>>;
fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>>;
fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>>;
}
pub trait AsyncTaskPoolExt<O: Default + 'static = ()>: Send + Sync + 'static {
fn set_waits(&mut self,
_waits: Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>) {}
fn get_waits(&self) -> Option<&Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>> {
None
}
fn idler_len(&self) -> usize {
0
}
fn spawn_worker(&self) -> Option<usize> {
None
}
fn worker_len(&self) -> usize {
#[cfg(not(target_arch = "wasm32"))]
return num_cpus::get();
#[cfg(target_arch = "wasm32")]
return 1;
}
fn buffer_len(&self) -> usize {
0
}
fn set_thread_waker(&mut self, _thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) {
}
fn clone_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
None
}
fn close_worker(&self) {
}
}
pub trait AsyncRuntime<O: Default + 'static = ()>: Clone + Send + Sync + 'static {
type Pool: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = Self::Pool>;
fn shared_pool(&self) -> Arc<Self::Pool>;
fn get_id(&self) -> usize;
fn wait_len(&self) -> usize;
fn len(&self) -> usize;
fn alloc<R: 'static>(&self) -> TaskId;
fn spawn<F>(&self, future: F) -> Result<TaskId>
where F: Future<Output = O> + Send + 'static;
fn spawn_local<F>(&self, future: F) -> Result<TaskId>
where F: Future<Output = O> + Send + 'static;
fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
where F: Future<Output = O> + Send + 'static;
fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
where F: Future<Output = O> + Send + 'static;
fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
where F: Future<Output = O> + Send + 'static;
fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where F: Future<Output = O> + Send + 'static;
fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where F: Future<Output = O> + Send + 'static;
fn spawn_priority_by_id<F>(&self,
task_id: TaskId,
priority: usize,
future: F) -> Result<()>
where F: Future<Output = O> + Send + 'static;
fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where F: Future<Output = O> + Send + 'static;
fn spawn_timing_by_id<F>(&self,
task_id: TaskId,
future: F,
time: usize) -> Result<()>
where F: Future<Output = O> + Send + 'static;
fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output>;
fn wakeup<Output: 'static>(&self, task_id: &TaskId);
fn wait<V: Send + 'static>(&self) -> AsyncWait<V>;
fn wait_any<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAny<V>;
fn wait_any_callback<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V>;
fn map_reduce<V: Send + 'static>(&self, capacity: usize) -> AsyncMapReduce<V>;
fn timeout(&self, timeout: usize) -> BoxFuture<'static, ()>;
fn yield_now(&self) -> BoxFuture<'static, ()>;
fn pipeline<S, SO, F, FO>(&self, input: S, filter: F) -> BoxStream<'static, FO>
where S: Stream<Item = SO> + Send + 'static,
SO: Send + 'static,
F: FnMut(SO) -> AsyncPipelineResult<FO> + Send + 'static,
FO: Send + 'static;
fn close(&self) -> bool;
}
pub trait AsyncRuntimeExt<O: Default + 'static = ()> {
fn spawn_with_context<F, C>(&self,
task_id: TaskId,
future: F,
context: C) -> Result<()>
where F: Future<Output = O> + Send + 'static,
C: 'static;
fn spawn_timing_with_context<F, C>(&self,
task_id: TaskId,
future: F,
context: C,
time: usize) -> Result<()>
where F: Future<Output = O> + Send + 'static,
C: Send + 'static;
fn block_on<F>(&self, future: F) -> Result<F::Output>
where F: Future + Send + 'static,
<F as Future>::Output: Default + Send + 'static;
}
pub struct AsyncRuntimeBuilder<O: Default + 'static = ()>(PhantomData<O>);
impl<O: Default + 'static> AsyncRuntimeBuilder<O> {
pub fn default_worker_thread(worker_name: Option<&str>,
worker_stack_size: Option<usize>,
worker_sleep_timeout: Option<u64>,
worker_loop_interval: Option<Option<u64>>) -> WorkerRuntime<O> {
let runner = WorkerTaskRunner::default();
let thread_name = if let Some(name) = worker_name {
name
} else {
"Default-Single-Worker"
};
let thread_stack_size = if let Some(size) = worker_stack_size {
size
} else {
2 * 1024 * 1024
};
let sleep_timeout = if let Some(timeout) = worker_sleep_timeout {
timeout
} else {
1
};
let loop_interval = if let Some(interval) = worker_loop_interval {
interval
} else {
None
};
let clock = Clock::new();
let runner_copy = runner.clone();
let rt_copy = runner.get_runtime();
let rt = runner.startup(
thread_name,
thread_stack_size,
sleep_timeout,
loop_interval,
move || {
let last = clock.recent();
match runner_copy.run_once() {
Err(e) => {
panic!("Run runner failed, reason: {:?}", e);
},
Ok(len) => {
(len == 0,
clock
.recent()
.duration_since(last))
},
}
},
move || {
rt_copy.wait_len() + rt_copy.len()
},
);
rt
}
pub fn custom_worker_thread<P, F0, F1>(pool: P,
worker_handle: Arc<AtomicBool>,
worker_condvar: Arc<(AtomicBool, Mutex<()>, Condvar)>,
thread_name: &str,
thread_stack_size: usize,
sleep_timeout: u64,
loop_interval: Option<u64>,
loop_func: F0,
get_queue_len: F1) -> WorkerRuntime<O, P>
where P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
F0: Fn() -> (bool, Duration) + Send + 'static,
F1: Fn() -> usize + Send + 'static {
let runner = WorkerTaskRunner::new(pool,
worker_handle,
worker_condvar);
let rt_copy = runner.get_runtime();
let rt = runner.startup(
thread_name,
thread_stack_size,
sleep_timeout,
loop_interval,
loop_func,
move || {
rt_copy.wait_len() + get_queue_len()
},
);
rt
}
pub fn default_multi_thread(worker_prefix: Option<&str>,
worker_stack_size: Option<usize>,
worker_size: Option<usize>,
worker_sleep_timeout: Option<u64>) -> MultiTaskRuntime<O> {
let mut builder = MultiTaskRuntimeBuilder::default();
if let Some(thread_prefix) = worker_prefix {
builder = builder.thread_prefix(thread_prefix);
}
if let Some(thread_stack_size) = worker_stack_size {
builder = builder.thread_stack_size(thread_stack_size);
}
if let Some(size) = worker_size {
builder = builder
.init_worker_size(size)
.set_worker_limit(size, size);
}
if let Some(sleep_timeout) = worker_sleep_timeout {
builder = builder.set_timeout(sleep_timeout);
}
builder.build()
}
pub fn custom_multi_thread<P>(pool: P,
worker_prefix: &str,
worker_stack_size: usize,
worker_size: usize,
worker_sleep_timeout: u64,
worker_timer_interval: usize) -> MultiTaskRuntime<O, P>
where P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P> {
MultiTaskRuntimeBuilder::new(pool)
.thread_prefix(worker_prefix)
.thread_stack_size(worker_stack_size)
.init_worker_size(worker_size)
.set_worker_limit(worker_size, worker_size)
.set_timeout(worker_sleep_timeout)
.set_timer_interval(worker_timer_interval)
.build()
}
}
pub fn bind_local_thread<O: Default + 'static>(runtime: LocalAsyncRuntime<O>) {
match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
let raw = Arc::into_raw(Arc::new(runtime)) as *mut LocalAsyncRuntime<O> as *mut ();
rt.store(raw, Ordering::Relaxed);
}) {
Err(e) => {
panic!("Bind single runtime to local thread failed, reason: {:?}", e);
},
Ok(_) => (),
}
}
pub fn unbind_local_thread() {
let _ = PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
rt.store(null_mut(), Ordering::Relaxed);
});
}
pub struct LocalAsyncRuntime<O: Default + 'static> {
inner: *const (), get_id_func: fn(*const ()) -> usize, spawn_func: fn(*const (), BoxFuture<'static, O>) -> Result<()>, spawn_timing_func: fn(*const (), BoxFuture<'static, O>, usize) -> Result<()>, timeout_func: fn(*const (), usize) -> BoxFuture<'static, ()>, }
unsafe impl<O: Default + 'static> Send for LocalAsyncRuntime<O> {}
unsafe impl<O: Default + 'static> Sync for LocalAsyncRuntime<O> {}
impl<O: Default + 'static> LocalAsyncRuntime<O> {
pub fn new(inner: *const (),
get_id_func: fn(*const ()) -> usize,
spawn_func: fn(*const (), BoxFuture<'static, O>) -> Result<()>,
spawn_timing_func: fn(*const (), BoxFuture<'static, O>, usize) -> Result<()>,
timeout_func: fn(*const (), usize) -> BoxFuture<'static, ()>) -> Self {
LocalAsyncRuntime {
inner,
get_id_func,
spawn_func,
spawn_timing_func,
timeout_func,
}
}
#[inline]
pub fn get_id(&self) -> usize {
(self.get_id_func)(self.inner)
}
#[inline]
pub fn spawn<F>(&self, future: F) -> Result<()>
where F: Future<Output = O> + Send + 'static {
(self.spawn_func)(self.inner, async move {
future.await
}.boxed())
}
#[inline]
pub fn sapwn_timing_func<F>(&self, future: F, timeout: usize) -> Result<()>
where F: Future<Output = O> + Send + 'static {
(self.spawn_timing_func)(self.inner,
async move {
future.await
}.boxed(),
timeout)
}
#[inline]
pub fn timeout(&self, timeout: usize) -> BoxFuture<'static, ()> {
(self.timeout_func)(self.inner, timeout)
}
}
pub fn local_async_runtime<O: Default + 'static>() -> Option<Arc<LocalAsyncRuntime<O>>> {
match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |ptr| {
let raw = ptr.load(Ordering::Relaxed) as *const LocalAsyncRuntime<O>;
unsafe {
if raw.is_null() {
None
} else {
let shared: Arc<LocalAsyncRuntime<O>> = unsafe { Arc::from_raw(raw) };
let result = shared.clone();
Arc::into_raw(shared); Some(result)
}
}
}) {
Err(_) => None, Ok(rt) => rt,
}
}
pub fn spawn_local<O, F>(future: F) -> Result<()>
where O: Default + 'static,
F: Future<Output = O> + Send + 'static {
if let Some(rt) = local_async_runtime::<O>() {
rt.spawn(future)
} else {
Err(Error::new(ErrorKind::Other, format!("Spawn task to local thread failed, reason: runtime not exist")))
}
}
pub fn get_local_dict<T: 'static>() -> Option<&'static T> {
match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT.try_with(move |dict| {
unsafe {
if let Some(any) = (&*dict.get()).get(&TypeId::of::<T>()) {
<dyn Any>::downcast_ref::<T>(&**any)
} else {
None
}
}
}) {
Err(_) => {
None
},
Ok(result) => {
result
}
}
}
pub fn get_local_dict_mut<T: 'static>() -> Option<&'static mut T> {
match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT.try_with(move |dict| {
unsafe {
if let Some(any) = (&mut *dict.get()).get_mut(&TypeId::of::<T>()) {
<dyn Any>::downcast_mut::<T>(&mut **any)
} else {
None
}
}
}) {
Err(_) => {
None
},
Ok(result) => {
result
}
}
}
pub fn set_local_dict<T: 'static>(value: T) -> Option<T> {
match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT.try_with(move |dict| {
unsafe {
let result = if let Some(any) = (&mut *dict.get()).remove(&TypeId::of::<T>()) {
if let Ok(r) = any.downcast() {
Some(*r)
} else {
None
}
} else {
None
};
(&mut *dict.get()).insert(TypeId::of::<T>(), Box::new(value) as Box<dyn Any>);
result
}
}) {
Err(_) => {
None
},
Ok(result) => {
result
}
}
}
pub fn remove_local_dict<T: 'static>() -> Option<T> {
match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT.try_with(move |dict| {
unsafe {
if let Some(any) = (&mut *dict.get()).remove(&TypeId::of::<T>()) {
if let Ok(r) = any.downcast() {
Some(*r)
} else {
None
}
} else {
None
}
}
}) {
Err(_) => {
None
},
Ok(result) => {
result
}
}
}
pub fn clear_local_dict() -> Result<()> {
match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT.try_with(move |dict| {
unsafe {
(&mut *dict.get()).clear();
}
}) {
Err(e) => {
Err(Error::new(ErrorKind::Other, format!("Clear local dict failed, reason: {:?}", e)))
},
Ok(_) => {
Ok(())
}
}
}
pub struct AsyncValue<V: Send + 'static>(Arc<InnerAsyncValue<V>>);
unsafe impl<V: Send + 'static> Send for AsyncValue<V> {}
unsafe impl<V: Send + 'static> Sync for AsyncValue<V> {}
impl<V: Send + 'static> Clone for AsyncValue<V> {
fn clone(&self) -> Self {
AsyncValue(self.0.clone())
}
}
impl<V: Send + 'static> Debug for AsyncValue<V> {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f,
"AsyncValue[status = {}]",
self.0.status.load(Ordering::Acquire))
}
}
impl<V: Send + 'static> Future for AsyncValue<V> {
type Output = V;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut spin_len = 1;
while self.0.status.load(Ordering::Acquire) == 2 {
spin_len = spin(spin_len);
}
if self.0.status.load(Ordering::Acquire) == 3 {
if let Some(value) = unsafe { (*(&self).0.value.get()).take() } {
return Poll::Ready(value);
}
}
unsafe {
*self.0.waker.get() = Some(cx.waker().clone()); }
let mut spin_len = 1;
loop {
match self.0.status.compare_exchange(0,
1, Ordering::Acquire,
Ordering::Relaxed) {
Err(2) => {
spin_len = spin(spin_len);
continue;
},
Err(3) => {
let value = unsafe { (*(&self).0.value.get()).take().unwrap() };
return Poll::Ready(value);
},
Err(_) => {
unimplemented!();
},
Ok(_) => {
return Poll::Pending;
},
}
}
}
}
impl<V: Send + 'static> AsyncValue<V> {
pub fn new() -> Self {
let inner = InnerAsyncValue {
value: UnsafeCell::new(None),
waker: UnsafeCell::new(None),
status: AtomicU8::new(0),
};
AsyncValue(Arc::new(inner))
}
pub fn is_complete(&self) -> bool {
self
.0
.status
.load(Ordering::Relaxed) == 3
}
pub fn set(self, value: V) {
loop {
match self.0.status.compare_exchange(1,
2,
Ordering::Acquire,
Ordering::Relaxed) {
Err(0) => {
match self.0.status.compare_exchange(0,
2,
Ordering::Acquire,
Ordering::Relaxed) {
Err(1) => {
continue;
},
Err(_) => {
return;
},
Ok(_) => {
unsafe { *self.0.value.get() = Some(value); }
self.0.status.store(3, Ordering::Release);
return;
}
}
},
Err(_) => {
return;
},
Ok(_) => {
break;
}
}
}
unsafe { *self.0.value.get() = Some(value); }
self.0.status.store(3, Ordering::Release);
let waker = unsafe { (*self.0.waker.get()).take().unwrap() };
waker.wake();
}
}
pub struct InnerAsyncValue<V: Send + 'static> {
value: UnsafeCell<Option<V>>, waker: UnsafeCell<Option<Waker>>, status: AtomicU8, }
pub struct AsyncVariableGuard<'a, V: Send + 'static> {
value: &'a UnsafeCell<Option<V>>, waker: &'a UnsafeCell<Option<Waker>>, status: &'a AtomicU8, }
unsafe impl<V: Send + 'static> Send for AsyncVariableGuard<'_, V> {}
impl<V: Send + 'static> Drop for AsyncVariableGuard<'_, V> {
fn drop(&mut self) {
self.status.fetch_sub(2, Ordering::Relaxed);
}
}
impl<V: Send + 'static> Deref for AsyncVariableGuard<'_, V> {
type Target = Option<V>;
fn deref(&self) -> &Self::Target {
unsafe {
&*self.value.get()
}
}
}
impl<V: Send + 'static> DerefMut for AsyncVariableGuard<'_, V> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe {
&mut *self.value.get()
}
}
}
impl<V: Send + 'static> AsyncVariableGuard<'_, V> {
pub fn finish(self) {
if self.status.fetch_add(4, Ordering::Relaxed) == 3 {
if let Some(waker) = unsafe { (&mut *self.waker.get()).take() } {
waker.wake();
}
}
}
}
pub struct AsyncVariable<V: Send + 'static>(Arc<InnerAsyncVariable<V>>);
unsafe impl<V: Send + 'static> Send for AsyncVariable<V> {}
unsafe impl<V: Send + 'static> Sync for AsyncVariable<V> {}
impl<V: Send + 'static> Clone for AsyncVariable<V> {
fn clone(&self) -> Self {
AsyncVariable(self.0.clone())
}
}
impl<V: Send + 'static> Future for AsyncVariable<V> {
type Output = V;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe {
*self.0.waker.get() = Some(cx.waker().clone()); }
let mut spin_len = 1;
loop {
match self.0.status.compare_exchange(0,
1,
Ordering::Acquire,
Ordering::Relaxed) {
Err(current) if current & 4 != 0 => {
unsafe {
let _ = (&mut *self.0.waker.get()).take(); return Poll::Ready((&mut *(&self).0.value.get()).take().unwrap());
}
},
Err(_) => {
spin_len = spin(spin_len);
},
Ok(_) => {
return Poll::Pending;
},
}
}
}
}
impl<V: Send + 'static> AsyncVariable<V> {
pub fn new() -> Self {
let inner = InnerAsyncVariable {
value: UnsafeCell::new(None),
waker: UnsafeCell::new(None),
status: AtomicU8::new(0),
};
AsyncVariable(Arc::new(inner))
}
pub fn is_complete(&self) -> bool {
self
.0
.status
.load(Ordering::Acquire) & 4 != 0
}
pub fn lock(&self) -> Option<AsyncVariableGuard<V>> {
let mut spin_len = 1;
loop {
match self
.0
.status
.compare_exchange(1,
3,
Ordering::Acquire,
Ordering::Relaxed) {
Err(0) => {
match self
.0
.status
.compare_exchange(0,
2,
Ordering::Acquire,
Ordering::Relaxed) {
Err(1) => {
continue;
},
Err(2) => {
spin_len = spin(spin_len);
},
Err(3) => {
spin_len = spin(spin_len);
},
Err(_) => {
return None;
},
Ok(_) => {
let guard = AsyncVariableGuard {
value: &self.0.value,
waker: &self.0.waker,
status: &self.0.status,
};
return Some(guard)
},
}
},
Err(2) => {
spin_len = spin(spin_len);
},
Err(3) => {
spin_len = spin(spin_len);
},
Err(_) => {
return None;
}
Ok(_) => {
let guard = AsyncVariableGuard {
value: &self.0.value,
waker: &self.0.waker,
status: &self.0.status,
};
return Some(guard)
},
}
}
}
}
pub struct InnerAsyncVariable<V: Send + 'static> {
value: UnsafeCell<Option<V>>, waker: UnsafeCell<Option<Waker>>, status: AtomicU8, }
pub struct AsyncWaitResult<V: Send + 'static>(pub Arc<RefCell<Option<Result<V>>>>);
unsafe impl<V: Send + 'static> Send for AsyncWaitResult<V> {}
unsafe impl<V: Send + 'static> Sync for AsyncWaitResult<V> {}
impl<V: Send + 'static> Clone for AsyncWaitResult<V> {
fn clone(&self) -> Self {
AsyncWaitResult(self.0.clone())
}
}
pub struct AsyncWaitResults<V: Send + 'static>(pub Arc<RefCell<Option<Vec<Result<V>>>>>);
unsafe impl<V: Send + 'static> Send for AsyncWaitResults<V> {}
unsafe impl<V: Send + 'static> Sync for AsyncWaitResults<V> {}
impl<V: Send + 'static> Clone for AsyncWaitResults<V> {
fn clone(&self) -> Self {
AsyncWaitResults(self.0.clone())
}
}
pub enum AsyncTimingTask<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static = (),
> {
Pended(TaskId), WaitRun(Arc<AsyncTask<P, O>>), }
pub struct AsyncTaskTimer<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static = (),
> {
producor: Sender<(usize, AsyncTimingTask<P, O>)>, consumer: Receiver<(usize, AsyncTimingTask<P, O>)>, timer: Arc<RefCell<Timer<AsyncTimingTask<P, O>, 1000, 60, 3>>>, clock: Clock, now: QInstant, }
unsafe impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Send for AsyncTaskTimer<P, O> {}
unsafe impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Sync for AsyncTaskTimer<P, O> {}
impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> AsyncTaskTimer<P, O> {
pub fn new() -> Self {
let (producor, consumer) = unbounded();
let clock = Clock::new();
let now = clock.recent();
AsyncTaskTimer {
producor,
consumer,
timer: Arc::new(RefCell::new(Timer::<AsyncTimingTask<P, O>, 1000, 60, 3>::default())),
clock,
now,
}
}
#[inline]
pub fn get_producor(&self) -> &Sender<(usize, AsyncTimingTask<P, O>)> {
&self.producor
}
#[inline]
pub fn len(&self) -> usize {
let timer = self.timer.as_ref().borrow();
timer.add_count() - timer.remove_count()
}
pub fn set_timer(&self, task: AsyncTimingTask<P, O>, timeout: usize) -> usize {
self
.timer
.borrow_mut()
.push(timeout, task)
.data()
.as_ffi() as usize
}
pub fn cancel_timer(&self, timer_ref: usize) -> Option<AsyncTimingTask<P, O>> {
if let Some(item) = self
.timer
.borrow_mut()
.cancel(KeyData::from_ffi(timer_ref as u64).into()) {
Some(item)
} else {
None
}
}
pub fn consume(&self) -> usize {
let timer_tasks = self.consumer.try_iter().collect::<Vec<(usize, AsyncTimingTask<P, O>)>>();
let len = timer_tasks.len();
for (timeout, task) in timer_tasks {
self.set_timer(task, timeout);
}
len
}
pub fn is_require_pop(&self) -> Option<u64> {
let current_time = self
.clock
.recent()
.duration_since(self.now)
.as_millis() as u64;
if self.timer.borrow_mut().is_ok(current_time) {
Some(current_time)
} else {
None
}
}
pub fn pop(&self, current_time: u64) -> Option<(usize, AsyncTimingTask<P, O>)> {
if let Some((key, item)) = self.timer.borrow_mut().pop_kv(current_time) {
Some((key.data().as_ffi() as usize, item))
} else {
None
}
}
}
pub struct AsyncTaskTimerByNotCancel<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static = (),
> {
producor: Sender<(usize, AsyncTimingTask<P, O>)>, consumer: Receiver<(usize, AsyncTimingTask<P, O>)>, timer: Arc<RefCell<NotCancelTimer<AsyncTimingTask<P, O>, 1000, 60, 3>>>, clock: Clock, now: QInstant, }
unsafe impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Send for AsyncTaskTimerByNotCancel<P, O> {}
unsafe impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Sync for AsyncTaskTimerByNotCancel<P, O> {}
impl<
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> AsyncTaskTimerByNotCancel<P, O> {
pub fn new() -> Self {
let (producor, consumer) = unbounded();
let clock = Clock::new();
let now = clock.recent();
AsyncTaskTimerByNotCancel {
producor,
consumer,
timer: Arc::new(RefCell::new(NotCancelTimer::<AsyncTimingTask<P, O>, 1000, 60, 3>::default())),
clock,
now,
}
}
#[inline]
pub fn get_producor(&self) -> &Sender<(usize, AsyncTimingTask<P, O>)> {
&self.producor
}
#[inline]
pub fn len(&self) -> usize {
let timer = self.timer.as_ref().borrow();
timer.add_count() - timer.remove_count()
}
pub fn set_timer(&self, task: AsyncTimingTask<P, O>, timeout: usize) {
self
.timer
.borrow_mut()
.push(timeout, task);
}
pub fn consume(&self) -> usize {
let timer_tasks = self.consumer.try_iter().collect::<Vec<(usize, AsyncTimingTask<P, O>)>>();
let len = timer_tasks.len();
for (timeout, task) in timer_tasks {
self.set_timer(task, timeout);
}
len
}
pub fn is_require_pop(&self) -> Option<u64> {
let current_time = self
.clock
.recent()
.duration_since(self.now)
.as_millis() as u64;
if self.timer.borrow_mut().is_ok(current_time) {
Some(current_time)
} else {
None
}
}
pub fn pop(&self, current_time: u64) -> Option<AsyncTimingTask<P, O>> {
if let Some(item) = self.timer.borrow_mut().pop(current_time) {
Some(item)
} else {
None
}
}
}
pub struct AsyncWaitTimeout<
RT: AsyncRuntime<O>,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static = (),
> {
rt: RT, producor: Sender<(usize, AsyncTimingTask<P, O>)>, timeout: usize, expired: AtomicBool, }
unsafe impl<
RT: AsyncRuntime<O>,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Send for AsyncWaitTimeout<RT, P, O> {}
unsafe impl<
RT: AsyncRuntime<O>,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Sync for AsyncWaitTimeout<RT, P, O> {}
impl<
RT: AsyncRuntime<O>,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Future for AsyncWaitTimeout<RT, P, O> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if (&self).expired.load(Ordering::Relaxed) {
return Poll::Ready(());
} else {
(&self).expired.store(true, Ordering::Relaxed);
}
let task_id = self.rt.alloc::<O>();
let reply = self.rt.pending(&task_id, cx.waker().clone());
let r = (&self).producor.send(((&self).timeout, AsyncTimingTask::Pended(task_id.clone())));
reply
}
}
impl<
RT: AsyncRuntime<O>,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> AsyncWaitTimeout<RT, P, O> {
pub fn new(rt: RT,
producor: Sender<(usize, AsyncTimingTask<P, O>)>,
timeout: usize) -> Self {
AsyncWaitTimeout {
rt,
producor,
timeout,
expired: AtomicBool::new(false), }
}
}
pub struct LocalAsyncWaitTimeout<
RT: AsyncRuntime<O>,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static = (),
> {
rt: RT, timer: Arc<AsyncTaskTimerByNotCancel<P, O>>, timeout: usize, expired: AtomicBool, }
unsafe impl<
RT: AsyncRuntime<O>,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Send for LocalAsyncWaitTimeout<RT, P, O> {}
unsafe impl<
RT: AsyncRuntime<O>,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Sync for LocalAsyncWaitTimeout<RT, P, O> {}
impl<
RT: AsyncRuntime<O>,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> Future for LocalAsyncWaitTimeout<RT, P, O> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if (&self).expired.load(Ordering::Relaxed) {
return Poll::Ready(());
} else {
(&self).expired.store(true, Ordering::Relaxed);
}
let task_id = self.rt.alloc::<O>();
let reply = self.rt.pending(&task_id, cx.waker().clone());
(&self)
.timer
.set_timer(AsyncTimingTask::Pended(task_id.clone()),
(&self).timeout);
reply
}
}
impl<
RT: AsyncRuntime<O>,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
O: Default + 'static,
> LocalAsyncWaitTimeout<RT, P, O> {
pub fn new(rt: RT,
timer: Arc<AsyncTaskTimerByNotCancel<P, O>>,
timeout: usize) -> Self {
LocalAsyncWaitTimeout {
rt,
timer,
timeout,
expired: AtomicBool::new(false), }
}
}
pub struct AsyncWait<V: Send + 'static>(AsyncWaitAny<V>);
unsafe impl<V: Send + 'static> Send for AsyncWait<V> {}
unsafe impl<V: Send + 'static> Sync for AsyncWait<V> {}
impl<V: Send + 'static> AsyncWait<V> {
pub fn spawn<RT, O, F>(&self,
rt: RT,
timeout: Option<usize>,
future: F) -> Result<()>
where RT: AsyncRuntime<O>,
O: Default + 'static,
F: Future<Output = Result<V>> + Send + 'static {
self.0.spawn(rt.clone(), future)?;
if let Some(timeout) = timeout {
let rt_copy = rt.clone();
self.0.spawn(rt, async move {
rt_copy.timeout(timeout).await;
Err(Error::new(ErrorKind::TimedOut, format!("Time out")))
})
} else {
Ok(())
}
}
pub fn spawn_local<O, F>(&self,
timeout: Option<usize>,
future: F) -> Result<()>
where O: Default + 'static,
F: Future<Output = Result<V>> + Send + 'static {
if let Some(rt) = local_async_runtime::<O>() {
self.0.spawn_local(future)?;
if let Some(timeout) = timeout {
let rt_copy = rt.clone();
self.0.spawn_local(async move {
rt_copy.timeout(timeout).await;
Err(Error::new(ErrorKind::TimedOut, format!("Time out")))
})
} else {
Ok(())
}
} else {
Err(Error::new(ErrorKind::Other, format!("Spawn wait task failed, reason: local async runtime not exist")))
}
}
}
impl<V: Send + 'static> AsyncWait<V> {
pub async fn wait_result(self) -> Result<V> {
self.0.wait_result().await
}
}
pub struct AsyncWaitAny<V: Send + 'static> {
capacity: usize, producor: AsyncSender<Result<V>>, consumer: AsyncReceiver<Result<V>>, }
unsafe impl<V: Send + 'static> Send for AsyncWaitAny<V> {}
unsafe impl<V: Send + 'static> Sync for AsyncWaitAny<V> {}
impl<V: Send + 'static> AsyncWaitAny<V> {
pub fn spawn<RT, O, F>(&self,
rt: RT,
future: F) -> Result<()>
where RT: AsyncRuntime<O>,
O: Default + 'static,
F: Future<Output = Result<V>> + Send + 'static {
let producor = self.producor.clone();
rt.spawn_by_id(rt.alloc::<O>(), async move {
let value = future.await;
producor.into_send_async(value).await;
Default::default()
})
}
pub fn spawn_local<F>(&self,
future: F) -> Result<()>
where F: Future<Output = Result<V>> + Send + 'static {
if let Some(rt) = local_async_runtime() {
let producor = self.producor.clone();
rt.spawn(async move {
let value = future.await;
producor.into_send_async(value).await;
})
} else {
Err(Error::new(ErrorKind::Other, format!("Spawn wait any task failed, reason: local async runtime not exist")))
}
}
}
impl<V: Send + 'static> AsyncWaitAny<V> {
pub async fn wait_result(self) -> Result<V> {
match self.consumer.recv_async().await {
Err(e) => {
Err(Error::new(ErrorKind::Other, format!("Wait any result failed, reason: {:?}", e)))
},
Ok(result) => {
result
},
}
}
}
pub struct AsyncWaitAnyCallback<V: Send + 'static> {
capacity: usize, producor: AsyncSender<Result<V>>, consumer: AsyncReceiver<Result<V>>, }
unsafe impl<V: Send + 'static> Send for AsyncWaitAnyCallback<V> {}
unsafe impl<V: Send + 'static> Sync for AsyncWaitAnyCallback<V> {}
impl<V: Send + 'static> AsyncWaitAnyCallback<V> {
pub fn spawn<RT, O, F>(&self,
rt: RT,
future: F) -> Result<()>
where RT: AsyncRuntime<O>,
O: Default + 'static,
F: Future<Output = Result<V>> + Send + 'static {
let producor = self.producor.clone();
rt.spawn_by_id(rt.alloc::<O>(), async move {
let value = future.await;
producor.into_send_async(value).await;
Default::default()
})
}
pub fn spawn_local<F>(&self,
future: F) -> Result<()>
where F: Future<Output = Result<V>> + Send + 'static {
if let Some(rt) = local_async_runtime() {
let producor = self.producor.clone();
rt.spawn(async move {
let value = future.await;
producor.into_send_async(value).await;
})
} else {
Err(Error::new(ErrorKind::Other, format!("Spawn wait any task failed by callback, reason: current async runtime not exist")))
}
}
}
impl<V: Send + 'static> AsyncWaitAnyCallback<V> {
pub async fn wait_result(mut self,
callback: impl Fn(&Result<V>) -> bool + Send + Sync + 'static) -> Result<V> {
let checker = create_checker(self.capacity, callback);
loop {
match self.consumer.recv_async().await {
Err(e) => {
return Err(Error::new(ErrorKind::Other, format!("Wait any result failed by callback, reason: {:?}", e)));
},
Ok(result) => {
if checker(&result) {
return result;
}
},
}
}
}
}
fn create_checker<V, F>(len: usize,
callback: F) -> Arc<dyn Fn(&Result<V>) -> bool + Send + Sync + 'static>
where V: Send + 'static,
F: Fn(&Result<V>) -> bool + Send + Sync + 'static {
let mut check_counter = AtomicUsize::new(len); Arc::new(move |result| {
if check_counter.fetch_sub(1, Ordering::SeqCst) == 1 {
true
} else {
callback(result)
}
})
}
pub struct AsyncMapReduce<V: Send + 'static> {
count: usize, capacity: usize, producor: AsyncSender<(usize, Result<V>)>, consumer: AsyncReceiver<(usize, Result<V>)>, }
unsafe impl<V: Send + 'static> Send for AsyncMapReduce<V> {}
impl<V: Send + 'static> AsyncMapReduce<V> {
pub fn map<RT, O, F>(&mut self, rt: RT, future: F) -> Result<usize>
where RT: AsyncRuntime<O>,
O: Default + 'static,
F: Future<Output = Result<V>> + Send + 'static {
if self.count >= self.capacity {
return Err(Error::new(ErrorKind::Other, format!("Map task to runtime failed, capacity: {}, reason: out of capacity", self.capacity)));
}
let index = self.count;
let producor = self.producor.clone();
rt.spawn_by_id(rt.alloc::<O>(), async move {
let value = future.await;
producor.into_send_async((index, value)).await;
Default::default()
})?;
self.count += 1; Ok(index)
}
}
impl<V: Send + 'static> AsyncMapReduce<V> {
pub async fn reduce(self, order: bool) -> Result<Vec<Result<V>>> {
let mut count = self.count;
let mut results = Vec::with_capacity(count);
while count > 0 {
match self.consumer.recv_async().await {
Err(e) => {
return Err(Error::new(ErrorKind::Other, format!("Reduce result failed, reason: {:?}", e)));
},
Ok((index, result)) => {
results.push((index, result));
count -= 1;
},
}
}
if order {
results.sort_by_key(|(key, _value)| {
key.clone()
});
}
let (_, values) = results
.into_iter()
.unzip::<usize, Result<V>, Vec<usize>, Vec<Result<V>>>();
Ok(values)
}
}
pub enum AsyncPipelineResult<O: 'static> {
Disconnect, Filtered(O), }
pub fn spawn_worker_thread<F0, F1>(thread_name: &str,
thread_stack_size: usize,
thread_handler: Arc<AtomicBool>,
thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>, sleep_timeout: u64, loop_interval: Option<u64>, loop_func: F0,
get_queue_len: F1) -> Arc<AtomicBool>
where F0: Fn() -> (bool, Duration) + Send + 'static,
F1: Fn() -> usize + Send + 'static {
let thread_status_copy = thread_handler.clone();
thread::Builder::new()
.name(thread_name.to_string())
.stack_size(thread_stack_size).spawn(move || {
let mut sleep_count = 0;
while thread_handler.load(Ordering::Relaxed) {
let (is_no_task, run_time) = loop_func();
if is_no_task {
if sleep_count > 1 {
sleep_count = 0; let (is_sleep, lock, condvar) = &*thread_waker;
let mut locked = lock.lock();
if get_queue_len() > 0 {
continue;
}
if !is_sleep.load(Ordering::Relaxed) {
is_sleep.store(true, Ordering::SeqCst);
if condvar
.wait_for(
&mut locked,
Duration::from_millis(sleep_timeout),
)
.timed_out()
{
is_sleep.store(false, Ordering::SeqCst);
}
}
continue; }
sleep_count += 1; if let Some(interval) = &loop_interval {
if let Some(remaining_interval) = Duration::from_millis(*interval).checked_sub(run_time){
thread::sleep(remaining_interval);
}
}
} else {
sleep_count = 0; if let Some(interval) = &loop_interval {
if let Some(remaining_interval) = Duration::from_millis(*interval).checked_sub(run_time){
thread::sleep(remaining_interval);
}
}
}
}
});
thread_status_copy
}
pub fn wakeup_worker_thread<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(worker_waker: &Arc<(AtomicBool, Mutex<()>, Condvar)>, rt: &SingleTaskRuntime<O, P>) {
if worker_waker.0.load(Ordering::Relaxed) && rt.len() > 0 {
let (is_sleep, lock, condvar) = &**worker_waker;
let _locked = lock.lock();
is_sleep.store(false, Ordering::SeqCst); let _ = condvar.notify_one();
}
}
pub fn register_global_panic_handler<Handler>(handler: Handler)
where Handler: Fn(thread::Thread, String, Option<String>, Option<(String, u32, u32)>) -> Option<i32> + Send + Sync + 'static {
set_hook(Box::new(move |panic_info| {
let thread_info = thread::current();
let payload = panic_info.payload();
let payload_info = match payload.downcast_ref::<&str>() {
None => {
match payload.downcast_ref::<String>() {
None => {
"Unknow panic".to_string()
},
Some(info) => {
info.clone()
}
}
},
Some(info) => {
info.to_string()
}
};
let other_info = if let Some(arg) = panic_info.message() {
if let Some(s) = arg.as_str() {
Some(s.to_string())
} else {
None
}
} else {
None
};
let location = if let Some(location) = panic_info.location() {
Some((location.file().to_string(), location.line(), location.column()))
} else {
None
};
if let Some(exit_code) = handler(thread_info, payload_info, other_info, location) {
std::process::exit(exit_code);
}
}));
}
pub fn replace_global_alloc_error_handler() {
set_alloc_error_hook(global_alloc_error_handle);
}
fn global_alloc_error_handle(layout: Layout) {
let bt = Backtrace::new();
eprintln!("[UTC: {}][Thread: {}]Global memory allocation of {:?} bytes failed, stacktrace: \n{:?}",
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis(),
thread::current().name().unwrap_or(""),
layout.size(),
bt);
}
pub(crate) struct YieldNow(bool);
impl Future for YieldNow {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.0 {
Poll::Ready(())
} else {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}