use std::any::Any;
use std::vec::IntoIter;
use std::future::Future;
use std::mem::transmute;
use std::time::Duration;
use std::cell::UnsafeCell;
use std::sync::{Arc, Weak};
use std::marker::PhantomData;
use std::thread::{self, Builder};
use std::task::{Waker, Context, Poll};
use std::io::{Error, Result, ErrorKind};
use std::sync::atomic::{AtomicBool, AtomicUsize, AtomicPtr, Ordering};
use parking_lot::{Mutex, RwLock, Condvar};
use crossbeam_channel::{Sender, bounded, unbounded};
use crossbeam_deque::{Injector, Stealer, Steal, Worker};
use crossbeam_queue::{ArrayQueue, SegQueue};
use flume::bounded as async_bounded;
use futures::{future::{FutureExt, BoxFuture},
stream::{Stream, StreamExt, BoxStream},
task::{ArcWake, waker_ref}, TryFuture};
use async_stream::stream;
use num_cpus;
use log::{debug, warn};
use pi_time::Instant;
use super::{PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME,
AsyncTaskPool,
AsyncTaskPoolExt,
AsyncRuntimeExt,
TaskId,
AsyncTask,
AsyncRuntime,
AsyncTimingTask,
AsyncTaskTimer,
AsyncWaitTimeout,
AsyncWait,
AsyncWaitAny,
AsyncWaitAnyCallback,
AsyncMapReduce,
AsyncPipelineResult,
LocalAsyncRuntime,
alloc_rt_uid, local_async_runtime};
#[cfg(not(target_arch = "wasm32"))]
const DEFAULT_INIT_WORKER_SIZE: usize = 2;
#[cfg(target_arch = "wasm32")]
const DEFAULT_INIT_WORKER_SIZE: usize = 1;
const DEFAULT_WORKER_THREAD_PREFIX: &str = "Default-Multi-RT";
const DEFAULT_THREAD_STACK_SIZE: usize = 1024 * 1024;
const DEFAULT_WORKER_THREAD_SLEEP_TIME: u64 = 10;
const DEFAULT_RUNTIME_SLEEP_TIME: u64 = 1000;
thread_local! {
static PI_ASYNC_THREAD_LOCAL_ID: UnsafeCell<usize> = UnsafeCell::new(0);
}
struct ComputationalTaskQueue<O: Default + 'static> {
stack: Worker<Arc<AsyncTask<ComputationalTaskPool<O>, O>>>, queue: SegQueue<Arc<AsyncTask<ComputationalTaskPool<O>, O>>>, thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>, }
impl<O: Default + 'static> ComputationalTaskQueue<O> {
pub fn new(thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) -> Self {
let stack = Worker::new_lifo();
let queue = SegQueue::new();
ComputationalTaskQueue {
stack,
queue,
thread_waker
}
}
pub fn len(&self) -> usize {
self.stack.len() + self.queue.len()
}
}
pub struct ComputationalTaskPool<O: Default + 'static> {
workers: Vec<ComputationalTaskQueue<O>>, waits: Option<Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>>, consume_count: Arc<AtomicUsize>, produce_count: Arc<AtomicUsize>, }
unsafe impl<O: Default + 'static> Send for ComputationalTaskPool<O> {}
unsafe impl<O: Default + 'static> Sync for ComputationalTaskPool<O> {}
impl<O: Default + 'static> Default for ComputationalTaskPool<O> {
fn default() -> Self {
#[cfg(not(target_arch = "wasm32"))]
let core_len = num_cpus::get(); #[cfg(target_arch = "wasm32")]
let core_len = 1; ComputationalTaskPool::new(core_len)
}
}
impl<O: Default + 'static> AsyncTaskPool<O> for ComputationalTaskPool<O> {
type Pool = ComputationalTaskPool<O>;
#[inline]
fn get_thread_id(&self) -> usize {
match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
unsafe { *thread_id.get() }
}) {
Err(e) => {
panic!("Get thread id failed, thread: {:?}, reason: {:?}", thread::current(), e);
},
Ok(id) => {
id
}
}
}
#[inline]
fn len(&self) -> usize {
if let Some(len) = self
.produce_count
.load(Ordering::Relaxed)
.checked_sub(self.consume_count.load(Ordering::Relaxed)) {
len
} else {
0
}
}
#[inline]
fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
let index = self.produce_count.fetch_add(1, Ordering::Relaxed) % self.workers.len();
self.workers[index].queue.push(task);
Ok(())
}
#[inline]
fn push_timed_out(&self, _index: u64, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
let id = self.get_thread_id();
let worker = &self.workers[id];
worker.stack.push(task);
self.produce_count.fetch_add(1, Ordering::Relaxed);
return Ok(());
}
#[inline]
fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
self.push(task)
}
#[inline]
fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
let id = self.get_thread_id();
let worker = &self.workers[id];
let task = worker.stack.pop();
if task.is_some() {
self.consume_count.fetch_add(1, Ordering::Relaxed);
return task;
}
let task = worker.queue.pop();
if task.is_some() {
self.consume_count.fetch_add(1, Ordering::Relaxed);
}
task
}
#[inline]
fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
let mut tasks = Vec::with_capacity(self.len());
while let Some(task) = self.try_pop() {
tasks.push(task);
}
tasks.into_iter()
}
#[inline]
fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
None
}
}
impl<O: Default + 'static> AsyncTaskPoolExt<O> for ComputationalTaskPool<O> {
#[inline]
fn set_waits(&mut self,
waits: Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>) {
self.waits = Some(waits);
}
#[inline]
fn get_waits(&self) -> Option<&Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>> {
self.waits.as_ref()
}
#[inline]
fn worker_len(&self) -> usize {
self.workers.len()
}
#[inline]
fn clone_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
let thread_id = self.get_thread_id();
let worker = &self.workers[thread_id];
Some(worker.thread_waker.clone())
}
}
impl<O: Default + 'static> ComputationalTaskPool<O> {
pub fn new(mut size: usize) -> Self {
if size < DEFAULT_INIT_WORKER_SIZE {
size = DEFAULT_INIT_WORKER_SIZE;
}
let mut workers = Vec::with_capacity(size);
for _ in 0..size {
let thread_waker = Arc::new((AtomicBool::new(false), Mutex::new(()), Condvar::new()));
let worker = ComputationalTaskQueue::new(thread_waker);
workers.push(worker);
}
let consume_count = Arc::new(AtomicUsize::new(0));
let produce_count = Arc::new(AtomicUsize::new(0));
ComputationalTaskPool {
workers,
waits: None,
consume_count,
produce_count,
}
}
}
struct StealableTaskQueue<O: Default + 'static> {
stack: Worker<Arc<AsyncTask<StealableTaskPool<O>, O>>>, queue: Worker<Arc<AsyncTask<StealableTaskPool<O>, O>>>, thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>, }
impl<O: Default + 'static> StealableTaskQueue<O> {
pub fn new(thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>)
-> (Self, Stealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>) {
let stack = Worker::new_lifo();
let queue = Worker::new_fifo();
let stealer = queue.stealer();
(StealableTaskQueue {
stack,
queue,
thread_waker
}, stealer)
}
pub fn len(&self) -> usize {
self.stack.len() + self.queue.len()
}
}
pub struct StealableTaskPool<O: Default + 'static> {
public: Injector<Arc<AsyncTask<StealableTaskPool<O>, O>>>, workers: Vec<Arc<RwLock<Option<StealableTaskQueue<O>>>>>, worker_stealers: Vec<Arc<RwLock<Option<Stealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>>>>>, frees: ArrayQueue<usize>, waits: Option<Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>>, consume_count: Arc<AtomicUsize>, produce_count: Arc<AtomicUsize>, }
unsafe impl<O: Default + 'static> Send for StealableTaskPool<O> {}
unsafe impl<O: Default + 'static> Sync for StealableTaskPool<O> {}
impl<O: Default + 'static> Default for StealableTaskPool<O> {
fn default() -> Self {
StealableTaskPool::new()
}
}
impl<O: Default + 'static> AsyncTaskPool<O> for StealableTaskPool<O> {
type Pool = StealableTaskPool<O>;
#[inline]
fn get_thread_id(&self) -> usize {
match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
unsafe { *thread_id.get() }
}) {
Err(e) => {
panic!("Get thread id failed, thread: {:?}, reason: {:?}", thread::current(), e);
},
Ok(id) => {
id
}
}
}
#[inline]
fn len(&self) -> usize {
if let Some(len) = self
.produce_count
.load(Ordering::Relaxed)
.checked_sub(self.consume_count.load(Ordering::Relaxed)) {
len
} else {
0
}
}
#[inline]
fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
self.public.push(task);
self.produce_count.fetch_add(1, Ordering::Relaxed);
Ok(())
}
#[inline]
fn push_timed_out(&self, _index: u64, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
let id = self.get_thread_id();
if let Some(worker) = &*(&self.workers[id]).read() {
worker.stack.push(task);
self.produce_count.fetch_add(1, Ordering::Relaxed);
return Ok(());
}
Err(Error::new(ErrorKind::Other,
format!("Push timed out failed, thread id: {}, reason: worker not exists", id)))
}
#[inline]
fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
self.push(task)
}
#[inline]
fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
let id = self.get_thread_id();
if let Some(worker) = &*(&self.workers[id]).read() {
let task = worker.stack.pop();
if task.is_some() {
self.consume_count.fetch_add(1, Ordering::Relaxed);
return task;
}
let task = worker.queue.pop();
if task.is_some() {
self.consume_count.fetch_add(1, Ordering::Relaxed);
return task;
} else {
loop {
match self.public.steal_batch_and_pop(&worker.queue) {
Steal::Retry => {
continue;
},
Steal::Success(task) => {
self.consume_count.fetch_add(1, Ordering::Relaxed);
return Some(task);
},
Steal::Empty => {
break;
},
}
}
let mut steal_task = None; for index in 0..self.worker_stealers.len() {
if id == index {
continue;
}
if let Some(stealer) = &*self.worker_stealers[index].read() {
if stealer.is_empty() {
continue;
}
loop {
match stealer.steal() {
Steal::Retry => {
continue;
},
Steal::Success(task) => {
if steal_task.is_none() {
self.consume_count.fetch_add(1, Ordering::Relaxed);
steal_task = Some(task);
} else {
worker.queue.push(task);
}
},
Steal::Empty => {
break;
}
}
}
}
}
return steal_task;
}
}
None
}
#[inline]
fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
let mut tasks = Vec::with_capacity(self.len());
while let Some(task) = self.try_pop() {
tasks.push(task);
}
tasks.into_iter()
}
#[inline]
fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
None
}
}
impl<O: Default + 'static> AsyncTaskPoolExt<O> for StealableTaskPool<O> {
#[inline]
fn set_waits(&mut self,
waits: Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>) {
self.waits = Some(waits);
}
#[inline]
fn get_waits(&self) -> Option<&Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>> {
self.waits.as_ref()
}
#[inline]
fn idler_len(&self) -> usize {
self.frees.len()
}
#[inline]
fn spawn_worker(&self) -> Option<usize> {
if self.idler_len() == 0 {
return None;
}
if let Some(id) = self.frees.pop() {
if self.workers[id].read().is_some() || self.worker_stealers[id].read().is_some() {
return None;
}
let thread_waker = Arc::new((AtomicBool::new(false), Mutex::new(()), Condvar::new()));
let (worker, worker_stealer) = StealableTaskQueue::new(thread_waker);
*self.workers[id].write() = Some(worker);
*self.worker_stealers[id].write() = Some(worker_stealer);
return Some(id);
}
None
}
#[inline]
fn worker_len(&self) -> usize {
let mut workers_len = 0usize;
for worker in &self.workers {
if worker.read().is_some() {
workers_len += 1;
}
}
workers_len
.checked_sub(self.frees.len())
.or(Some(0))
.unwrap()
}
#[inline]
fn buffer_len(&self) -> usize {
self.public.len()
}
#[inline]
fn clone_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
let thread_id = self.get_thread_id();
if let Some(worker) = &*self.workers[thread_id].read() {
return Some(worker.thread_waker.clone());
}
None
}
#[inline]
fn close_worker(&self) {
let thread_id = self.get_thread_id();
let current = thread::current();
let _ = self.workers[thread_id].write().take();
warn!("Remove worker task pool ok, worker: {}, thread: {:?}", thread_id, current);
let _ = self.worker_stealers[thread_id].write().take();
warn!("Remove worker stealer ok, worker: {}, thread: {:?}", thread_id, current);
self.frees.push(thread_id);
}
}
impl<O: Default + 'static> StealableTaskPool<O> {
pub fn new() -> Self {
#[cfg(not(target_arch = "wasm32"))]
let size = num_cpus::get() * 2; #[cfg(target_arch = "wasm32")]
let size = 1; StealableTaskPool::with(DEFAULT_INIT_WORKER_SIZE, size)
}
pub fn with(init: usize, max: usize) -> Self {
if init == 0 || max == 0 || init > max {
panic!("Create MultiTaskPool failed, init: {}, max: {}, reason: invalid init or max", init, max);
}
let public = Injector::new();
let mut workers = Vec::with_capacity(max);
let mut worker_stealers = Vec::with_capacity(max);
let frees = ArrayQueue::new(max);
for _ in 0..init {
let thread_waker = Arc::new((AtomicBool::new(false), Mutex::new(()), Condvar::new()));
let (worker, worker_stealer) = StealableTaskQueue::new(thread_waker);
workers.push(Arc::new(RwLock::new(Some(worker))));
worker_stealers.push(Arc::new(RwLock::new(Some(worker_stealer))));
}
for index in init..max {
workers.push(Arc::new(RwLock::new(None)));
worker_stealers.push(Arc::new(RwLock::new(None)));
frees.push(index); }
let consume_count = Arc::new(AtomicUsize::new(0));
let produce_count = Arc::new(AtomicUsize::new(0));
StealableTaskPool {
public,
workers,
worker_stealers,
frees,
waits: None,
consume_count,
produce_count,
}
}
}
pub struct MultiTaskRuntime<
O: Default + 'static = (),
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = StealableTaskPool<O>
>(Arc<(
usize, //运行时唯一id
Arc<P>, //异步任务池
Option<Vec<(Sender<(usize, AsyncTimingTask<P, O>)>, Arc<Mutex<AsyncTaskTimer<P, O>>>)>>, //休眠的异步任务生产者和本地定时器
AtomicUsize, //定时任务计数器
Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>, //待唤醒的工作者唤醒器队列
(String, usize, usize, u64, Option<usize>), //当前运行时配置参数
)>);
unsafe impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
> Send for MultiTaskRuntime<O, P> {}
unsafe impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
> Sync for MultiTaskRuntime<O, P> {}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
> Clone for MultiTaskRuntime<O, P> {
fn clone(&self) -> Self {
MultiTaskRuntime(self.0.clone())
}
}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
> AsyncRuntime<O> for MultiTaskRuntime<O, P> {
type Pool = P;
fn shared_pool(&self) -> Arc<Self::Pool> {
(self.0).1.clone()
}
fn get_id(&self) -> usize {
(self.0).0
}
fn wait_len(&self) -> usize {
let mut len = 0;
if let Some(vec) = &(self.0).2 {
for (_, timer) in vec.iter() {
len += timer.lock().len();
}
}
len
}
fn len(&self) -> usize {
(self.0).1.len()
}
fn alloc(&self) -> TaskId {
TaskId(Arc::new(AtomicUsize::new(0)))
}
fn spawn<F>(&self, task_id: TaskId, future: F) -> Result<()>
where F: Future<Output = O> + Send + 'static {
let task = Arc::new(AsyncTask::new(task_id,
(self.0).1.clone(),
Some(future.boxed())));
let result = (self.0).1.push(task);
if let Some(worker_waker) = (self.0).4.pop() {
let (is_sleep, lock, condvar) = &*worker_waker;
let locked = lock.lock();
if is_sleep.load(Ordering::Relaxed) {
is_sleep.store(false, Ordering::SeqCst); condvar.notify_one();
}
} else {
let busy_size = (self.0).1.buffer_len();
if busy_size > 100 {
if let Some(index) = (self.0).1.spawn_worker() {
let builder = Builder::new()
.name(((self.0).5).0.clone() + "-" + index.to_string().as_str())
.stack_size(((self.0).5).2);
if let Some(timers) = &(self.0).2 {
let (_, timer) = &timers[index];
spawn_worker_thread(builder,
index,
self.clone(),
((self.0).5).1,
((self.0).5).3,
((self.0).5).4,
Some(timer.clone()));
} else {
spawn_worker_thread(builder,
index,
self.clone(),
((self.0).5).1,
((self.0).5).3,
((self.0).5).4,
None);
}
}
}
}
result
}
fn spawn_timing<F>(&self, task_id: TaskId, future: F, time: usize) -> Result<()>
where F: Future<Output = O> + Send + 'static {
if let Some(timers) = &(self.0).2 {
let mut index: usize = (self.0).3.fetch_add(1, Ordering::Relaxed) % timers.len(); let (_, timer) = &timers[index];
timer
.lock()
.set_timer(AsyncTimingTask::WaitRun(Arc::new(AsyncTask::new(task_id,
(self.0).1.clone(),
Some(future.boxed())))),
time);
return Ok(());
}
Err(Error::new(ErrorKind::Other, format!("Spawn timing task failed, task_id: {:?}, reason: timer not exist", task_id)))
}
fn pending<Output>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
task_id.0.store(Box::into_raw(Box::new(waker)) as usize, Ordering::Relaxed);
Poll::Pending
}
fn wakeup(&self, task_id: &TaskId) {
match task_id.0.load(Ordering::Relaxed) {
0 => panic!("Multi runtime wakeup task failed, reason: task id not exist"),
ptr => {
unsafe {
let waker = Box::from_raw(ptr as *mut Waker);
waker.wake();
}
},
}
}
fn wait<V: Send + 'static>(&self) -> AsyncWait<V> {
AsyncWait(self.wait_any(2))
}
fn wait_any<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncWaitAny {
capacity,
producor,
consumer,
}
}
fn wait_any_callback<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncWaitAnyCallback {
capacity,
producor,
consumer,
}
}
fn map_reduce<V: Send + 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncMapReduce {
count: 0,
capacity,
producor,
consumer,
}
}
fn timeout(&self, timeout: usize) -> BoxFuture<'static, ()> {
let rt = self.clone();
if let Some(timers) = &(self.0).2 {
match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
let thread_id = unsafe { *thread_id.get() };
timers[thread_id].clone()
}) {
Err(_) => panic!("Multi thread runtime timeout failed, reason: local thread id not match"),
Ok((producor, _)) => {
AsyncWaitTimeout::new(rt,
producor,
timeout)
.boxed()
},
}
} else {
async move {
thread::sleep(Duration::from_millis(timeout as u64));
}.boxed()
}
}
fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> BoxStream<'static, FO>
where S: Stream<Item = SO> + Send + 'static,
SO: Send + 'static,
F: FnMut(SO) -> AsyncPipelineResult<FO> + Send + 'static,
FO: Send + 'static {
let output = stream! {
for await value in input {
match filter(value) {
AsyncPipelineResult::Disconnect => {
break;
},
AsyncPipelineResult::Filtered(result) => {
yield result;
},
}
}
};
output.boxed()
}
fn close(&self) -> bool {
false
}
}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
> AsyncRuntimeExt<O> for MultiTaskRuntime<O, P> {
fn spawn_with_context<F, C>(&self,
task_id: TaskId,
future: F,
context: C) -> Result<()>
where F: Future<Output = O> + Send + 'static,
C: 'static {
let task = Arc::new(AsyncTask::with_context(
task_id,
(self.0).1.clone(),
Some(future.boxed()),
context));
let result = (self.0).1.push(task);
if let Some(worker_waker) = (self.0).4.pop() {
let (is_sleep, lock, condvar) = &*worker_waker;
let locked = lock.lock();
if is_sleep.load(Ordering::Relaxed) {
is_sleep.store(false, Ordering::SeqCst); condvar.notify_one();
}
} else {
let busy_size = (self.0).1.buffer_len();
if busy_size > 100 {
if let Some(index) = (self.0).1.spawn_worker() {
let builder = Builder::new()
.name(((self.0).5).0.clone() + "-" + index.to_string().as_str())
.stack_size(((self.0).5).2);
if let Some(timers) = &(self.0).2 {
let (_, timer) = &timers[index];
spawn_worker_thread(builder,
index,
self.clone(),
((self.0).5).1,
((self.0).5).3,
((self.0).5).4,
Some(timer.clone()));
} else {
spawn_worker_thread(builder,
index,
self.clone(),
((self.0).5).1,
((self.0).5).3,
((self.0).5).4,
None);
}
}
}
}
result
}
fn spawn_timing_with_context<F, C>(&self,
task_id: TaskId,
future: F,
context: C,
time: usize) -> Result<()>
where F: Future<Output = O> + Send + 'static,
C: 'static {
if let Some(timers) = &(self.0).2 {
let mut index: usize = (self.0).3.fetch_add(1, Ordering::Relaxed) % timers.len(); let (_, timer) = &timers[index];
timer
.lock()
.set_timer(AsyncTimingTask::WaitRun(Arc::new(AsyncTask::with_context(task_id,
(self.0).1.clone(),
Some(future.boxed()),
context))),
time);
return Ok(());
}
Err(Error::new(ErrorKind::Other, format!("Spawn timing task failed, task_id: {:?}, reason: timer not exist", task_id)))
}
fn block_on<F>(&self, future: F) -> Result<F::Output>
where F: Future + Send + 'static,
<F as Future>::Output: Default + Send + 'static {
if let Some(local_rt) = local_async_runtime::<F::Output>() {
if local_rt.get_id() == self.get_id() {
return Err(Error::new(ErrorKind::WouldBlock, format!("Block on failed, reason: would block")));
}
}
let (sender, receiver) = bounded(1);
if let Err(e) = self.spawn(self.alloc(), async move {
let r = future.await;
sender.send(r);
Default::default()
}) {
return Err(Error::new(ErrorKind::Other, format!("Block on failed, reason: {:?}", e)));
}
match receiver.recv() {
Err(e) => {
Err(Error::new(ErrorKind::Other, format!("Block on failed, reason: {:?}", e)))
},
Ok(result) => {
Ok(result)
},
}
}
}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
> MultiTaskRuntime<O, P> {
pub fn idler_len(&self) -> usize {
(self.0).1.idler_len()
}
pub fn worker_len(&self) -> usize {
(self.0).1.worker_len()
}
pub fn worker_timing_len(&self, index: usize) -> usize {
if let Some(vec) = &(self.0).2 {
let (_, timer) = &vec[index];
return timer.lock().len();
}
0
}
pub fn buffer_len(&self) -> usize {
(self.0).1.buffer_len()
}
pub fn to_local_runtime(&self) -> LocalAsyncRuntime<O> {
LocalAsyncRuntime {
inner: self.as_raw(),
get_id_func: MultiTaskRuntime::<O, P>::get_id_raw,
spawn_func: MultiTaskRuntime::<O, P>::spawn_raw,
spawn_timing_func: MultiTaskRuntime::<O, P>::spawn_timing_raw,
timeout_func: MultiTaskRuntime::<O, P>::timeout_raw,
}
}
#[inline]
pub(crate) fn as_raw(&self) -> *const () {
Arc::into_raw(self.0.clone()) as *const ()
}
#[inline]
pub(crate) fn from_raw(raw: *const ()) -> Self {
let inner = unsafe {
Arc::from_raw(raw as *const (
usize,
Arc<P>,
Option<Vec<(Sender<(usize, AsyncTimingTask<P, O>)>, Arc<Mutex<AsyncTaskTimer<P, O>>>)>>,
AtomicUsize,
Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>,
(String, usize, usize, u64, Option<usize>),
))
};
MultiTaskRuntime(inner)
}
pub(crate) fn get_id_raw(raw: *const ()) -> usize {
let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
let id = rt.get_id();
Arc::into_raw(rt.0); id
}
pub(crate) fn spawn_raw<F>(raw: *const (),
future: F) -> Result<()>
where F: Future<Output = O> + Send + 'static {
let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
let result = rt.spawn(rt.alloc(), future);
Arc::into_raw(rt.0); result
}
pub(crate) fn spawn_timing_raw(raw: *const (),
future: BoxFuture<'static, O>,
timeout: usize) -> Result<()> {
let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
let result = rt.spawn_timing(rt.alloc(), future, timeout);
Arc::into_raw(rt.0); result
}
pub(crate) fn timeout_raw(raw: *const (), timeout: usize) -> BoxFuture<'static, ()> {
let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
let boxed = rt.timeout(timeout);
Arc::into_raw(rt.0); boxed
}
}
pub struct MultiTaskRuntimeBuilder<
O: Default + 'static = (),
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = StealableTaskPool<O>
> {
pool: P, prefix: String, init: usize, min: usize, max: usize, stack_size: usize, timeout: u64, interval: Option<usize>, marker: PhantomData<O>,
}
unsafe impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
> Send for MultiTaskRuntimeBuilder<O, P> {}
unsafe impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
> Sync for MultiTaskRuntimeBuilder<O, P> {}
impl<O: Default + 'static> Default for MultiTaskRuntimeBuilder<O> {
fn default() -> Self {
#[cfg(not(target_arch = "wasm32"))]
let core_len = num_cpus::get(); #[cfg(target_arch = "wasm32")]
let core_len = 1; let pool = StealableTaskPool::with(core_len, core_len);
MultiTaskRuntimeBuilder::new(pool)
.thread_stack_size(2 * 1024 * 1024)
.set_timer_interval(1)
}
}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
> MultiTaskRuntimeBuilder<O, P> {
pub fn new(mut pool: P) -> Self {
#[cfg(not(target_arch = "wasm32"))]
let core_len = num_cpus::get(); #[cfg(target_arch = "wasm32")]
let core_len = 1;
MultiTaskRuntimeBuilder {
pool,
prefix: DEFAULT_WORKER_THREAD_PREFIX.to_string(),
init: core_len,
min: core_len,
max: core_len,
stack_size: DEFAULT_THREAD_STACK_SIZE,
timeout: DEFAULT_WORKER_THREAD_SLEEP_TIME,
interval: None,
marker: PhantomData,
}
}
pub fn thread_prefix(mut self, prefix: &str) -> Self {
self.prefix = prefix.to_string();
self
}
pub fn thread_stack_size(mut self, stack_size: usize) -> Self {
self.stack_size = stack_size;
self
}
pub fn init_worker_size(mut self, mut init: usize) -> Self {
if init == 0 {
init = DEFAULT_INIT_WORKER_SIZE;
}
self.init = init;
self
}
pub fn set_worker_limit(mut self, mut min: usize, mut max: usize) -> Self {
if self.init > max {
max = self.init;
}
if min == 0 || min > max {
min = max;
}
self.min = min;
self.max = max;
self
}
pub fn set_timeout(mut self, timeout: u64) -> Self {
self.timeout = timeout;
self
}
pub fn set_timer_interval(mut self, interval: usize) -> Self {
self.interval = Some(interval);
self
}
pub fn build(mut self) -> MultiTaskRuntime<O, P> {
let interval = self.interval;
let mut timers = if let Some(_) = interval {
Some(Vec::with_capacity(self.max))
} else {
None
};
for _ in 0..self.max {
if let Some(vec) = &mut timers {
let timer = AsyncTaskTimer::new();
let producor = timer.producor.clone();
let timer = Arc::new(Mutex::new(timer));
vec.push((producor, timer));
};
}
let rt_uid = alloc_rt_uid();
let waits = Arc::new(ArrayQueue::new(self.max));
let mut pool = self.pool;
pool.set_waits(waits.clone()); let pool = Arc::new(pool);
let runtime = MultiTaskRuntime(Arc::new((
rt_uid,
pool,
timers,
AtomicUsize::new(0),
waits,
(self.prefix.clone(), self.min, self.stack_size, self.timeout, self.interval),
)));
let mut builders = Vec::with_capacity(self.init);
for index in 0..self.init {
let builder = Builder::new()
.name(self.prefix.clone() + "-" + index.to_string().as_str())
.stack_size(self.stack_size);
builders.push(builder);
}
let min = self.min;
for index in 0..builders.len() {
let builder = builders.remove(0);
let runtime = runtime.clone();
let timeout = self.timeout;
let timer = if let Some(timers) = &(runtime.0).2 {
let (_, timer) = &timers[index];
Some(timer.clone())
} else {
None
};
spawn_worker_thread(builder, index, runtime, min, timeout, interval, timer);
}
runtime
}
}
fn spawn_worker_thread<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
>(builder: Builder,
index: usize,
runtime: MultiTaskRuntime<O, P>,
min: usize,
timeout: u64,
interval: Option<usize>,
timer: Option<Arc<Mutex<AsyncTaskTimer<P, O>>>>) {
if let Some(timer) = timer {
let _ = builder.spawn(move || {
if let Err(e) = PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
unsafe { *thread_id.get() = index; }
}) {
panic!("Multi thread runtime startup failed, thread id: {:?}, reason: {:?}", index, e);
}
let runtime_copy = runtime.clone();
match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
let raw = Arc::into_raw(Arc::new(runtime_copy.to_local_runtime())) as *mut LocalAsyncRuntime<O> as *mut ();
rt.store(raw, Ordering::Relaxed);
}) {
Err(e) => {
panic!("Bind multi runtime to local thread failed, reason: {:?}", e);
},
Ok(_) => (),
}
timer_work_loop(runtime,
index,
min,
timeout,
interval.unwrap() as u64,
timer);
});
} else {
let _ = builder.spawn(move || {
if let Err(e) = PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
unsafe { *thread_id.get() = index; }
}) {
panic!("Multi thread runtime startup failed, thread id: {:?}, reason: {:?}", index, e);
}
let runtime_copy = runtime.clone();
match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
let raw = Arc::into_raw(Arc::new(runtime_copy.to_local_runtime())) as *mut LocalAsyncRuntime<O> as *mut ();
rt.store(raw, Ordering::Relaxed);
}) {
Err(e) => {
panic!("Bind multi runtime to local thread failed, reason: {:?}", e);
},
Ok(_) => (),
}
work_loop(runtime, index, min, timeout);
});
}
}
fn timer_work_loop<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
>(runtime: MultiTaskRuntime<O, P>,
index: usize,
min: usize,
sleep_timeout: u64,
timer_interval: u64,
timer: Arc<Mutex<AsyncTaskTimer<P, O>>>) {
let pool = (runtime.0).1.clone();
let worker_waker = pool.clone_thread_waker().unwrap();
let mut sleep_count = 0; loop {
let mut timer_run_millis = Instant::now(); timer.lock().consume(); loop {
let current_time = timer.lock().is_require_pop(); if let Some(current_time) = current_time {
loop {
let timed_out = timer.lock().pop(current_time); if let Some((handle, timing_task)) = timed_out {
match timing_task {
AsyncTimingTask::Pended(expired) => {
runtime.wakeup(&expired);
},
AsyncTimingTask::WaitRun(expired) => {
(runtime.0).1.push_timed_out(handle as u64, expired);
if let Some(task) = pool.try_pop() {
sleep_count = 0; run_task(&runtime, task);
}
},
}
if let Some(task) = pool.try_pop() {
sleep_count = 0; run_task(&runtime, task);
}
} else {
break;
}
}
} else {
break;
}
}
match pool.try_pop() {
None => {
if runtime.len() > 0 {
continue;
}
if sleep_count > 2 {
if is_closeable(&runtime, min) {
break;
}
sleep_count = 0;
}
{
let (is_sleep, lock, condvar) = &*worker_waker;
let mut locked = lock.lock();
is_sleep.store(true, Ordering::SeqCst);
let diff_time = Instant::now()
.duration_since(timer_run_millis)
.as_millis() as u64; let real_timeout = if timer.lock().len() == 0 {
sleep_timeout
} else {
if diff_time >= timer_interval {
continue;
} else {
timer_interval - diff_time
}
};
(runtime.0).4.push(worker_waker.clone());
if condvar
.wait_for(
&mut locked,
Duration::from_millis(real_timeout),
)
.timed_out()
{
is_sleep.store(false, Ordering::SeqCst);
sleep_count += 1;
}
}
},
Some(task) => {
sleep_count = 0; run_task(&runtime, task);
},
}
}
(runtime.0).1.close_worker();
warn!("Worker of runtime closed, runtime: {}, worker: {}, thread: {:?}",
runtime.get_id(),
index,
thread::current());
}
fn work_loop<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
>(runtime: MultiTaskRuntime<O, P>, index: usize, min: usize, sleep_timeout: u64) {
let pool = (runtime.0).1.clone();
let worker_waker = pool.clone_thread_waker().unwrap();
let mut sleep_count = 0; loop {
match pool.try_pop() {
None => {
if runtime.len() > 0 {
continue;
}
if sleep_count > 2 {
if is_closeable(&runtime, min) {
break;
}
sleep_count = 0;
}
{
let (is_sleep, lock, condvar) = &*worker_waker;
let mut locked = lock.lock();
is_sleep.store(true, Ordering::SeqCst);
(runtime.0).4.push(worker_waker.clone());
if condvar
.wait_for(
&mut locked,
Duration::from_millis(sleep_timeout),
)
.timed_out()
{
is_sleep.store(false, Ordering::SeqCst);
sleep_count += 1;
}
}
},
Some(task) => {
sleep_count = 0; run_task(&runtime, task);
},
}
}
(runtime.0).1.close_worker();
warn!("Worker of runtime closed, runtime: {}, worker: {}, thread: {:?}",
runtime.get_id(),
index,
thread::current());
}
#[inline]
fn is_closeable<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
>(runtime: &MultiTaskRuntime<O, P>, min: usize) -> bool {
match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
unsafe { *thread_id.get() }
}) {
Err(_) => {
true
},
Ok(thread_id) => {
if runtime.worker_timing_len(thread_id) > 0 {
false
} else {
if runtime.worker_len() <= min {
false
} else {
if runtime.buffer_len() > 0 {
false
} else {
true
}
}
}
},
}
}
#[inline]
fn run_task<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
>(runtime: &MultiTaskRuntime<O, P>, task: Arc<AsyncTask<P, O>>) {
let waker = waker_ref(&task);
let mut context = Context::from_waker(&*waker);
if let Some(mut future) = task.get_inner() {
if let Poll::Pending = future.as_mut().poll(&mut context) {
task.set_inner(Some(future));
}
} else {
(runtime.0).1.push(task);
}
}
#[test]
fn test_mutli_task_pool() {
use std::time::Instant;
let pool = Arc::new(StealableTaskPool::with(8, 8));
println!("!!!!!!pool len: {}", pool.len());
let pool0 = pool.clone();
let pool1 = pool.clone();
let pool2 = pool.clone();
let pool3 = pool.clone();
let pool4 = pool.clone();
let pool5 = pool.clone();
let pool6 = pool.clone();
let pool7 = pool.clone();
let pool00 = pool.clone();
let pool01 = pool.clone();
let pool02 = pool.clone();
let pool03 = pool.clone();
let pool04 = pool.clone();
let pool05 = pool.clone();
let pool06 = pool.clone();
let pool07 = pool.clone();
let start = Instant::now();
thread::spawn(move || {
for _ in 0..2000000 {
let task = AsyncTask::new(
TaskId(Arc::new(AtomicUsize::new(0))),
pool0.clone(),
Some(async move {}.boxed()));
pool0.push(Arc::new(task));
}
});
thread::spawn(move || {
for _ in 0..2000000 {
let task = AsyncTask::new(
TaskId(Arc::new(AtomicUsize::new(0))),
pool1.clone(),
Some(async move {}.boxed()));
pool1.push(Arc::new(task));
}
});
thread::spawn(move || {
for _ in 0..2000000 {
let task = AsyncTask::new(
TaskId(Arc::new(AtomicUsize::new(0))),
pool2.clone(),
Some(async move {}.boxed()));
pool2.push(Arc::new(task));
}
});
thread::spawn(move || {
for _ in 0..2000000 {
let task = AsyncTask::new(
TaskId(Arc::new(AtomicUsize::new(0))),
pool3.clone(),
Some(async move {}.boxed()));
pool3.push(Arc::new(task));
}
});
thread::spawn(move || {
for _ in 0..2000000 {
let task = AsyncTask::new(
TaskId(Arc::new(AtomicUsize::new(0))),
pool4.clone(),
Some(async move {}.boxed()));
pool4.push(Arc::new(task));
}
});
thread::spawn(move || {
for _ in 0..2000000 {
let task = AsyncTask::new(
TaskId(Arc::new(AtomicUsize::new(0))),
pool5.clone(),
Some(async move {}.boxed()));
pool5.push(Arc::new(task));
}
});
thread::spawn(move || {
for _ in 0..2000000 {
let task = AsyncTask::new(
TaskId(Arc::new(AtomicUsize::new(0))),
pool6.clone(),
Some(async move {}.boxed()));
pool6.push(Arc::new(task));
}
});
thread::spawn(move || {
for _ in 0..2000000 {
let task = AsyncTask::new(
TaskId(Arc::new(AtomicUsize::new(0))),
pool7.clone(),
Some(async move {}.boxed()));
pool7.push(Arc::new(task));
}
});
let join0 = thread::spawn(move || {
PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
unsafe { *thread_id.get() = 0; }
});
let mut count = 0;
loop {
if let None = pool00.try_pop() {
thread::sleep(Duration::from_millis(10));
if pool00.len() == 0 {
break;
} else {
continue;
}
}
count += 1;
}
println!("!!!!!!pool00 count: {}", count);
});
let join1 = thread::spawn(move || {
PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
unsafe { *thread_id.get() = 1; }
});
let mut count = 0;
loop {
if let None = pool01.try_pop() {
thread::sleep(Duration::from_millis(10));
if pool01.len() == 0 {
break;
} else {
continue;
}
}
count += 1;
}
println!("!!!!!!pool01 count: {}", count);
});
let join2 = thread::spawn(move || {
PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
unsafe { *thread_id.get() = 2; }
});
let mut count = 0;
loop {
if let None = pool02.try_pop() {
thread::sleep(Duration::from_millis(10));
if pool02.len() == 0 {
break;
} else {
continue;
}
}
count += 1;
}
println!("!!!!!!pool02 count: {}", count);
});
let join3 = thread::spawn(move || {
PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
unsafe { *thread_id.get() = 3; }
});
let mut count = 0;
loop {
if let None = pool03.try_pop() {
thread::sleep(Duration::from_millis(10));
if pool03.len() == 0 {
break;
} else {
continue;
}
}
count += 1;
}
println!("!!!!!!pool03 count: {}", count);
});
let join4 = thread::spawn(move || {
PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
unsafe { *thread_id.get() = 4; }
});
let mut count = 0;
loop {
if let None = pool04.try_pop() {
thread::sleep(Duration::from_millis(10));
if pool04.len() == 0 {
break;
} else {
continue;
}
}
count += 1;
}
println!("!!!!!!pool04 count: {}", count);
});
let join5 = thread::spawn(move || {
PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
unsafe { *thread_id.get() = 5; }
});
let mut count = 0;
loop {
if let None = pool05.try_pop() {
thread::sleep(Duration::from_millis(10));
if pool05.len() == 0 {
break;
} else {
continue;
}
}
count += 1;
}
println!("!!!!!!pool05 count: {}", count);
});
let join6 = thread::spawn(move || {
PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
unsafe { *thread_id.get() = 6; }
});
let mut count = 0;
loop {
if let None = pool06.try_pop() {
thread::sleep(Duration::from_millis(10));
if pool06.len() == 0 {
break;
} else {
continue;
}
}
count += 1;
}
println!("!!!!!!pool06 count: {}", count);
});
let join7 = thread::spawn(move || {
PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
unsafe { *thread_id.get() = 7; }
});
let mut count = 0;
loop {
if let None = pool07.try_pop() {
thread::sleep(Duration::from_millis(10));
if pool07.len() == 0 {
break;
} else {
continue;
}
}
count += 1;
}
println!("!!!!!!pool07 count: {}", count);
});
join0.join();
join1.join();
join2.join();
join3.join();
join4.join();
join5.join();
join6.join();
join7.join();
println!("pool len: {}, time: {:?}", pool.len(), Instant::now() - start);
}
#[test]
fn test_computational_runtime() {
use std::mem;
use std::time::Instant;
use env_logger;
use crate::rt::{spawn_local,
get_local_dict,
get_local_dict_mut,
set_local_dict,
remove_local_dict,
clear_local_dict};
env_logger::init();
struct AtomicCounter(AtomicUsize, Instant);
impl Drop for AtomicCounter {
fn drop(&mut self) {
unsafe {
println!("!!!!!!drop counter, count: {:?}, time: {:?}", self.0.load(Ordering::Relaxed), Instant::now() - self.1);
}
}
}
let pool = ComputationalTaskPool::new(8);
let builer = MultiTaskRuntimeBuilder::new(pool)
.set_timer_interval(1);
let rt = builer.build();
let rt0 = rt.clone();
let rt1 = rt.clone();
let rt2 = rt.clone();
let rt3 = rt.clone();
let rt4 = rt.clone();
let rt5 = rt.clone();
let rt6 = rt.clone();
let rt7 = rt.clone();
let counter = Arc::new(AtomicCounter(AtomicUsize::new(0), Instant::now()));
let count0 = counter.clone();
let count1 = counter.clone();
let count2 = counter.clone();
let count3 = counter.clone();
let count4 = counter.clone();
let count5 = counter.clone();
let count6 = counter.clone();
let count7 = counter.clone();
mem::drop(counter);
rt.spawn(rt.alloc(), async move {
use crate::rt::spawn_local;
if let Err(e) = spawn_local(async move {
println!("Test spawn local ok");
}) {
println!("Test spawn local failed, reason: {:?}", e);
}
});
let rt_copy = rt.clone();
let thread_handle = thread::spawn(move || {
match rt_copy.block_on(async move {
set_local_dict::<usize>(0);
println!("get local dict, init value: {}", *get_local_dict::<usize>().unwrap());
*get_local_dict_mut::<usize>().unwrap() = 0xffffffff;
println!("get local dict, value after modify: {}", *get_local_dict::<usize>().unwrap());
if let Some(value) = remove_local_dict::<usize>() {
println!("get local dict, value after remove: {:?}, last value: {}", get_local_dict::<usize>(), value);
}
set_local_dict::<usize>(0);
clear_local_dict();
println!("get local dict, value after clear: {:?}", get_local_dict::<usize>());
"Test block on ok".to_string()
}) {
Err(e) => {
println!("Test block on failed, reason: {:?}", e);
},
Ok(r) => {
println!("{}", r);
},
}
});
thread_handle.join();
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2000000 {
let counter_copy = count0.clone();
if let Err(e) = rt0.spawn(rt0.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn multi task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn multi task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2000000 {
let counter_copy = count1.clone();
if let Err(e) = rt1.spawn(rt1.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn multi task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn multi task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2000000 {
let counter_copy = count2.clone();
if let Err(e) = rt2.spawn(rt2.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn multi task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn multi task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2000000 {
let counter_copy = count3.clone();
if let Err(e) = rt3.spawn(rt3.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn multi task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn multi task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2000000 {
let counter_copy = count4.clone();
if let Err(e) = rt4.spawn(rt4.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn multi task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn multi task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2000000 {
let counter_copy = count5.clone();
if let Err(e) = rt5.spawn(rt5.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn multi task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn multi task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2000000 {
let counter_copy = count6.clone();
if let Err(e) = rt6.spawn(rt6.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn multi task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn multi task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2000000 {
let counter_copy = count7.clone();
if let Err(e) = rt7.spawn(rt7.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn multi task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn multi task ok, time: {:?}", Instant::now() - start);
});
thread::sleep(Duration::from_millis(1000000000));
}
#[test]
fn test_stealable_runtime() {
use std::mem;
use std::time::Instant;
use env_logger;
use crate::rt::{spawn_local,
get_local_dict,
get_local_dict_mut,
set_local_dict,
remove_local_dict,
clear_local_dict};
env_logger::init();
struct AtomicCounter(AtomicUsize, Instant);
impl Drop for AtomicCounter {
fn drop(&mut self) {
unsafe {
println!("!!!!!!drop counter, count: {:?}, time: {:?}", self.0.load(Ordering::Relaxed), Instant::now() - self.1);
}
}
}
let pool = StealableTaskPool::with(8, 8);
let builer = MultiTaskRuntimeBuilder::new(pool)
.set_timer_interval(1);
let rt = builer.build();
let rt0 = rt.clone();
let rt1 = rt.clone();
let rt2 = rt.clone();
let rt3 = rt.clone();
let rt4 = rt.clone();
let rt5 = rt.clone();
let rt6 = rt.clone();
let rt7 = rt.clone();
let counter = Arc::new(AtomicCounter(AtomicUsize::new(0), Instant::now()));
let count0 = counter.clone();
let count1 = counter.clone();
let count2 = counter.clone();
let count3 = counter.clone();
let count4 = counter.clone();
let count5 = counter.clone();
let count6 = counter.clone();
let count7 = counter.clone();
mem::drop(counter);
rt.spawn(rt.alloc(), async move {
use crate::rt::spawn_local;
if let Err(e) = spawn_local(async move {
println!("Test spawn local ok");
}) {
println!("Test spawn local failed, reason: {:?}", e);
}
});
let rt_copy = rt.clone();
let thread_handle = thread::spawn(move || {
match rt_copy.block_on(async move {
set_local_dict::<usize>(0);
println!("get local dict, init value: {}", *get_local_dict::<usize>().unwrap());
*get_local_dict_mut::<usize>().unwrap() = 0xffffffff;
println!("get local dict, value after modify: {}", *get_local_dict::<usize>().unwrap());
if let Some(value) = remove_local_dict::<usize>() {
println!("get local dict, value after remove: {:?}, last value: {}", get_local_dict::<usize>(), value);
}
set_local_dict::<usize>(0);
clear_local_dict();
println!("get local dict, value after clear: {:?}", get_local_dict::<usize>());
"Test block on ok".to_string()
}) {
Err(e) => {
println!("Test block on failed, reason: {:?}", e);
},
Ok(r) => {
println!("{}", r);
},
}
});
thread_handle.join();
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2000000 {
let counter_copy = count0.clone();
if let Err(e) = rt0.spawn(rt0.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn multi task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn multi task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2000000 {
let counter_copy = count1.clone();
if let Err(e) = rt1.spawn(rt1.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn multi task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn multi task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2000000 {
let counter_copy = count2.clone();
if let Err(e) = rt2.spawn(rt2.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn multi task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn multi task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2000000 {
let counter_copy = count3.clone();
if let Err(e) = rt3.spawn(rt3.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn multi task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn multi task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2000000 {
let counter_copy = count4.clone();
if let Err(e) = rt4.spawn(rt4.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn multi task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn multi task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2000000 {
let counter_copy = count5.clone();
if let Err(e) = rt5.spawn(rt5.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn multi task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn multi task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2000000 {
let counter_copy = count6.clone();
if let Err(e) = rt6.spawn(rt6.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn multi task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn multi task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2000000 {
let counter_copy = count7.clone();
if let Err(e) = rt7.spawn(rt7.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn multi task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn multi task ok, time: {:?}", Instant::now() - start);
});
thread::sleep(Duration::from_millis(1000000000));
}