use std::any::Any;
use std::vec::IntoIter;
use std::cell::RefCell;
use std::future::Future;
use std::mem::transmute;
use std::cell::UnsafeCell;
use std::sync::{Arc, Weak};
use std::task::{Waker, Context, Poll};
use std::io::{Error, Result, ErrorKind};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use parking_lot::{Mutex, Condvar};
use crossbeam_channel::{Sender, bounded, unbounded};
use flume::bounded as async_bounded;
use futures::{future::{FutureExt, LocalBoxFuture},
stream::{Stream, StreamExt, LocalBoxStream},
task::{ArcWake, waker_ref}};
use async_stream::stream;
use crate::{lock::{spin,
mpsc_deque::{Sender as MpscSent, Receiver as MpscRecv, mpsc_deque}},
rt::{TaskId, AsyncPipelineResult, alloc_rt_uid,
serial::{AsyncRuntime,
AsyncRuntimeExt,
AsyncTaskPool,
AsyncTaskPoolExt,
AsyncTask,
AsyncTaskTimer,
AsyncWaitTimeout,
AsyncWaitResult,
AsyncTimingTask,
AsyncWait,
AsyncWaitAny,
AsyncWaitAnyCallback,
AsyncMapReduce,
LocalAsyncRuntime,
bind_local_thread,
local_async_runtime}}};
pub struct SingleTaskPool<O: Default + 'static> {
id: usize, consumer: Arc<RefCell<MpscRecv<Arc<AsyncTask<SingleTaskPool<O>, O>>>>>, producer: Arc<MpscSent<Arc<AsyncTask<SingleTaskPool<O>, O>>>>, consume_count: Arc<AtomicUsize>, produce_count: Arc<AtomicUsize>, thread_waker: Option<Arc<(AtomicBool, Mutex<()>, Condvar)>>, }
unsafe impl<O: Default + 'static> Send for SingleTaskPool<O> {}
unsafe impl<O: Default + 'static> Sync for SingleTaskPool<O> {}
impl<O: Default + 'static> Clone for SingleTaskPool<O> {
fn clone(&self) -> Self {
SingleTaskPool {
id: self.id,
consumer: self.consumer.clone(),
producer: self.producer.clone(),
consume_count: self.consume_count.clone(),
produce_count: self.produce_count.clone(),
thread_waker: self.thread_waker.clone(),
}
}
}
impl<O: Default + 'static> Default for SingleTaskPool<O> {
fn default() -> Self {
let rt_uid = alloc_rt_uid();
let (producer, consumer) = mpsc_deque();
let consume_count = Arc::new(AtomicUsize::new(0));
let produce_count = Arc::new(AtomicUsize::new(0));
SingleTaskPool {
id: (rt_uid << 8) & 0xffff | 1,
consumer: Arc::new(RefCell::new(consumer)),
producer: Arc::new(producer),
consume_count,
produce_count,
thread_waker: Some(Arc::new((AtomicBool::new(false), Mutex::new(()), Condvar::new()))),
}
}
}
impl<O: Default + 'static> AsyncTaskPool<O> for SingleTaskPool<O> {
type Pool = SingleTaskPool<O>;
#[inline]
fn get_thread_id(&self) -> usize {
self.id
}
#[inline]
fn len(&self) -> usize {
if let Some(len) = self
.produce_count
.load(Ordering::Relaxed)
.checked_sub(self.consume_count.load(Ordering::Relaxed)) {
len
} else {
0
}
}
#[inline]
fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
self.producer.send(task);
self.produce_count.fetch_add(1, Ordering::Relaxed);
Ok(())
}
#[inline]
fn push_timed_out(&self, _index: u64, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
self.consumer.as_ref().borrow_mut().push_front(task);
self.produce_count.fetch_add(1, Ordering::Relaxed);
Ok(())
}
#[inline]
fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
self.push(task)
}
#[inline]
fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
let task = self.consumer.as_ref().borrow_mut().try_recv();
if task.is_some() {
self.consume_count.fetch_add(1, Ordering::Relaxed);
}
task
}
#[inline]
fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
let all = self.consumer.as_ref().borrow_mut().try_recv_all();
self.consume_count.fetch_add(all.len(), Ordering::Relaxed);
all.into_iter()
}
#[inline]
fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
self.thread_waker.as_ref()
}
}
impl<O: Default + 'static> AsyncTaskPoolExt<O> for SingleTaskPool<O> {
fn set_thread_waker(&mut self, thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) {
self.thread_waker = Some(thread_waker);
}
}
pub struct SingleTaskRuntime<
O: Default + 'static = (),
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>,
>(Arc<(
usize, //运行时唯一id
Arc<P>, //异步任务池
Sender<(usize, AsyncTimingTask<P, O>)>, //休眠的异步任务生产者
Mutex<AsyncTaskTimer<P, O>>, //本地定时器
)>);
unsafe impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
> Send for SingleTaskRuntime<O, P> {}
unsafe impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
> Sync for SingleTaskRuntime<O, P> {}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
> Clone for SingleTaskRuntime<O, P> {
fn clone(&self) -> Self {
SingleTaskRuntime(self.0.clone())
}
}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
> AsyncRuntime<O> for SingleTaskRuntime<O, P> {
type Pool = P;
fn shared_pool(&self) -> Arc<Self::Pool> {
(self.0).1.clone()
}
fn get_id(&self) -> usize {
(self.0).0
}
fn wait_len(&self) -> usize {
(self.0).3.lock().len()
}
fn len(&self) -> usize {
(self.0).1.len()
}
fn alloc(&self) -> TaskId {
TaskId(Arc::new(AtomicUsize::new(0)))
}
fn spawn<F>(&self, task_id: TaskId, future: F) -> Result<()>
where F: Future<Output = O> + 'static {
if let Err(e) = (self.0)
.1
.push(Arc::new(AsyncTask::new(task_id, (self.0).1.clone(), Some(future.boxed_local())))) {
return Err(Error::new(ErrorKind::Other, e));
}
Ok(())
}
fn spawn_timing<F>(&self, task_id: TaskId, future: F, time: usize) -> Result<()>
where F: Future<Output = O> + 'static {
(self.0).3.lock().set_timer(AsyncTimingTask::WaitRun(Arc::new(AsyncTask::new(task_id.clone(), (self.0).1.clone(), Some(future.boxed_local())))), time);
Ok(())
}
fn pending<Output>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
task_id.0.store(Box::into_raw(Box::new(waker)) as usize, Ordering::Relaxed);
Poll::Pending
}
fn wakeup(&self, task_id: &TaskId) {
match task_id.0.load(Ordering::Relaxed) {
0 => panic!("Single runtime wakeup task failed, reason: task id not exist"),
ptr => {
unsafe {
let waker = Box::from_raw(ptr as *mut Waker);
waker.wake();
}
},
}
}
fn wait<V: 'static>(&self) -> AsyncWait<V> {
AsyncWait::new(self.wait_any(2))
}
fn wait_any<V: 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncWaitAny::new(capacity, producor, consumer)
}
fn wait_any_callback<V: 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncWaitAnyCallback::new(capacity, producor, consumer)
}
fn map_reduce<V: 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncMapReduce::new(0, capacity, producor, consumer)
}
fn timeout(&self, timeout: usize) -> LocalBoxFuture<'static, ()> {
let rt = self.clone();
let producor = (self.0).2.clone();
AsyncWaitTimeout::new(rt,
producor,
timeout)
.boxed_local()
}
fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> LocalBoxStream<'static, FO>
where S: Stream<Item = SO> + 'static,
SO: 'static,
F: FnMut(SO) -> AsyncPipelineResult<FO> + 'static,
FO: 'static {
let output = stream! {
for await value in input {
match filter(value) {
AsyncPipelineResult::Disconnect => {
break;
},
AsyncPipelineResult::Filtered(result) => {
yield result;
},
}
}
};
output.boxed_local()
}
fn close(&self) -> bool {
false
}
}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
> AsyncRuntimeExt<O> for SingleTaskRuntime<O, P> {
fn spawn_with_context<F, C>(&self,
task_id: TaskId,
future: F,
context: C) -> Result<()>
where F: Future<Output = O> + 'static,
C: 'static {
if let Err(e) = (self.0)
.1
.push(Arc::new(AsyncTask::with_context(task_id,
(self.0).1.clone(),
Some(future.boxed_local()),
context))) {
return Err(Error::new(ErrorKind::Other, e));
}
Ok(())
}
fn spawn_timing_with_context<F, C>(&self,
task_id: TaskId,
future: F,
context: C,
time: usize) -> Result<()>
where F: Future<Output = O> + 'static,
C: 'static {
(self.0)
.3
.lock()
.set_timer(AsyncTimingTask::WaitRun(Arc::new(AsyncTask::with_context(task_id.clone(),
(self.0).1.clone(),
Some(future.boxed_local()),
context))), time);
Ok(())
}
fn block_on<F>(&self, future: F) -> Result<F::Output>
where F: Future + 'static,
<F as Future>::Output: Default + 'static {
let (sender, receiver) = bounded(1);
if let Err(e) = self.spawn(self.alloc(), async move {
let r = future.await;
sender.send(r);
Default::default()
}) {
return Err(Error::new(ErrorKind::Other, format!("Block on failed, reason: {:?}", e)));
}
let mut count = 0;
let mut spin_len = 1;
loop {
count += 1;
if count > 3 {
spin_len = spin(spin_len);
}
(self.0).3.lock().consume(); loop {
let current_time = (self.0).3.lock().is_require_pop(); if let Some(current_time) = current_time {
let timed_out = (self.0).3.lock().pop(current_time); if let Some((handle, timing_task)) = timed_out {
match timing_task {
AsyncTimingTask::Pended(expired) => {
self.wakeup(&expired);
if let Some(task) = (self.0).1.try_pop() {
run_task(task);
}
},
AsyncTimingTask::WaitRun(expired) => {
(self.0).1.push_timed_out(handle as u64, expired);
if let Some(task) = (self.0).1.try_pop() {
run_task(task);
}
},
}
}
} else {
break;
}
}
if let Some(task) = (self.0).1.try_pop() {
run_task(task);
}
match receiver.try_recv() {
Err(e) => {
if e.is_disconnected() {
return Err(Error::new(ErrorKind::Other, format!("Block on failed, reason: {:?}", e)));
}
},
Ok(result) => {
return Ok(result)
},
}
}
}
}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
> SingleTaskRuntime<O, P> {
pub fn to_local_runtime(&self) -> LocalAsyncRuntime<O> {
LocalAsyncRuntime::new(
self.as_raw(),
SingleTaskRuntime::<O, P>::get_id_raw,
SingleTaskRuntime::<O, P>::spawn_raw,
SingleTaskRuntime::<O, P>::spawn_timing_raw,
SingleTaskRuntime::<O, P>::timeout_raw
)
}
#[inline]
pub(crate) fn as_raw(&self) -> *const () {
Arc::into_raw(self.0.clone()) as *const ()
}
#[inline]
pub(crate) fn from_raw(raw: *const ()) -> Self {
let inner = unsafe {
Arc::from_raw(raw as *const (
usize,
Arc<P>,
Sender<(usize, AsyncTimingTask<P, O>)>,
Mutex<AsyncTaskTimer<P, O>>,
))
};
SingleTaskRuntime(inner)
}
pub(crate) fn get_id_raw(raw: *const ()) -> usize {
let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
let id = rt.get_id();
Arc::into_raw(rt.0); id
}
pub(crate) fn spawn_raw(raw: *const (),
future: LocalBoxFuture<'static, O>) -> Result<()> {
let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
let result = rt.spawn(rt.alloc(), future);
Arc::into_raw(rt.0); result
}
pub(crate) fn spawn_timing_raw(raw: *const (),
future: LocalBoxFuture<'static, O>,
timeout: usize) -> Result<()> {
let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
let result = rt.spawn_timing(rt.alloc(), future, timeout);
Arc::into_raw(rt.0); result
}
pub(crate) fn timeout_raw(raw: *const (),
timeout: usize) -> LocalBoxFuture<'static, ()> {
let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
let boxed = rt.timeout(timeout);
Arc::into_raw(rt.0); boxed
}
}
pub struct SingleTaskRunner<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>
> {
is_running: AtomicBool, runtime: SingleTaskRuntime<O, P>, }
unsafe impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
> Send for SingleTaskRunner<O, P> {}
unsafe impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
> Sync for SingleTaskRunner<O, P> {}
impl<O: Default + 'static> Default for SingleTaskRunner<O> {
fn default() -> Self {
SingleTaskRunner::new(SingleTaskPool::default())
}
}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
> SingleTaskRunner<O, P> {
pub fn new(pool: P) -> Self {
let rt_uid = pool.get_thread_id();
let pool = Arc::new(pool);
let timer = AsyncTaskTimer::new();
let producor = timer.get_producor().clone();
let timer = Mutex::new(timer);
let runtime = SingleTaskRuntime(Arc::new((
rt_uid,
pool,
producor,
timer,
)));
SingleTaskRunner {
is_running: AtomicBool::new(false),
runtime,
}
}
pub fn get_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
(self.runtime.0).1.get_thread_waker().cloned()
}
pub fn startup(&self) -> Option<SingleTaskRuntime<O, P>> {
if cfg!(target_arch = "aarch64") {
match self
.is_running
.compare_exchange(false,
true,
Ordering::SeqCst,
Ordering::SeqCst) {
Ok(false) => {
Some(self.runtime.clone())
}
_ => {
None
}
}
} else {
match self
.is_running
.compare_exchange_weak(false,
true,
Ordering::SeqCst,
Ordering::SeqCst) {
Ok(false) => {
Some(self.runtime.clone())
}
_ => {
None
}
}
}
}
pub fn run_once(&self) -> Result<usize> {
if !self.is_running.load(Ordering::Relaxed) {
return Err(Error::new(ErrorKind::Other, "Single thread runtime not running"));
}
(self.runtime.0).3.lock().consume(); loop {
let current_time = (self.runtime.0).3.lock().is_require_pop(); if let Some(current_time) = current_time {
let timed_out = (self.runtime.0).3.lock().pop(current_time); if let Some((handle, timing_task)) = timed_out {
match timing_task {
AsyncTimingTask::Pended(expired) => {
self.runtime.wakeup(&expired);
if let Some(task) = (self.runtime.0).1.try_pop() {
run_task(task);
}
},
AsyncTimingTask::WaitRun(expired) => {
(self.runtime.0).1.push_timed_out(handle as u64, expired);
if let Some(task) = (self.runtime.0).1.try_pop() {
run_task(task);
}
},
}
}
} else {
break;
}
}
match (self.runtime.0).1.try_pop() {
None => {
return Ok(0);
},
Some(task) => {
run_task(task);
},
}
Ok((self.runtime.0).1.len())
}
pub fn run(&self) -> Result<usize> {
if !self.is_running.load(Ordering::Relaxed) {
return Err(Error::new(ErrorKind::Other, "Single thread runtime not running"));
}
let mut tasks = (self.runtime.0).1.try_pop_all();
(self.runtime.0).3.lock().consume(); loop {
let current_time = (self.runtime.0).3.lock().is_require_pop(); if let Some(current_time) = current_time {
loop {
let timed_out = (self.runtime.0).3.lock().pop(current_time); if let Some((handle, timing_task)) = timed_out {
match timing_task {
AsyncTimingTask::Pended(expired) => {
self.runtime.wakeup(&expired);
if let Some(task) = (self.runtime.0).1.try_pop() {
run_task(task);
}
},
AsyncTimingTask::WaitRun(expired) => {
(self.runtime.0).1.push_timed_out(handle as u64, expired);
if let Some(task) = (self.runtime.0).1.try_pop() {
run_task(task);
}
},
}
if let Some(task) = tasks.next() {
run_task(task);
}
} else {
break;
}
}
} else {
break;
}
}
loop {
if let Some(task) = tasks.next() {
run_task(task);
} else {
return Ok((self.runtime.0).1.len());
}
}
}
pub fn into_local(self) -> SingleTaskRuntime<O, P> {
self.runtime
}
}
#[inline]
fn run_task<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(task: Arc<AsyncTask<P, O>>) {
let waker = waker_ref(&task);
let mut context = Context::from_waker(&*waker);
if let Some(mut future) = task.get_inner() {
if let Poll::Pending = future.as_mut().poll(&mut context) {
task.set_inner(Some(future));
}
}
}
#[test]
fn test_single_runtime() {
use std::mem;
use std::thread;
use std::time::{Duration, Instant};
use crate::rt::{spawn_local,
get_local_dict,
get_local_dict_mut,
set_local_dict,
remove_local_dict,
clear_local_dict};
struct AtomicCounter(AtomicUsize, Instant);
impl Drop for AtomicCounter {
fn drop(&mut self) {
unsafe {
println!("!!!!!!drop counter, count: {:?}, time: {:?}", self.0.load(Ordering::Relaxed), Instant::now() - self.1);
}
}
}
let rt_uid = alloc_rt_uid();
let (producer, consumer) = mpsc_deque();
let consume_count = Arc::new(AtomicUsize::new(0));
let produce_count = Arc::new(AtomicUsize::new(0));
let pool = SingleTaskPool {
id: (rt_uid << 8) & 0xffff | 1,
consumer: Arc::new(RefCell::new(consumer)),
producer: Arc::new(producer),
consume_count,
produce_count,
thread_waker: None,
};
let runner = SingleTaskRunner::new(pool);
let rt = runner.startup().unwrap();
let rt0 = rt.clone();
let rt1 = rt.clone();
let rt2 = rt.clone();
let rt3 = rt.clone();
let rt_copy = rt.clone();
thread::spawn(move || {
bind_local_thread(rt_copy.to_local_runtime());
loop {
if let Err(e) = runner.run() {
println!("!!!!!!run failed, reason: {:?}", e);
break;
}
thread::sleep(Duration::from_millis(1));
}
});
rt.spawn(rt.alloc(), async move {
if let Err(e) = spawn_local(async move {
println!("Test spawn local ok");
}) {
println!("Test spawn local failed, reason: {:?}", e);
}
});
let rt_copy = rt.clone();
let thread_handle = thread::spawn(move || {
match rt_copy.block_on(async move {
set_local_dict::<usize>(0);
println!("get local dict, init value: {}", *get_local_dict::<usize>().unwrap());
*get_local_dict_mut::<usize>().unwrap() = 0xffffffff;
println!("get local dict, value after modify: {}", *get_local_dict::<usize>().unwrap());
if let Some(value) = remove_local_dict::<usize>() {
println!("get local dict, value after remove: {:?}, last value: {}", get_local_dict::<usize>(), value);
}
set_local_dict::<usize>(0);
clear_local_dict();
println!("get local dict, value after clear: {:?}", get_local_dict::<usize>());
"Test block on ok".to_string()
}) {
Err(e) => {
println!("Test block on failed, reason: {:?}", e);
},
Ok(r) => {
println!("{}", r);
},
}
});
thread_handle.join();
let counter = Arc::new(AtomicCounter(AtomicUsize::new(0), Instant::now()));
let counter0 = counter.clone();
let counter1 = counter.clone();
let counter2 = counter.clone();
let counter3 = counter.clone();
mem::drop(counter);
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2500000 {
let counter_copy = counter0.clone();
if let Err(e) = rt0.spawn(rt0.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn singale task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn single task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2500000 {
let counter_copy = counter1.clone();
if let Err(e) = rt1.spawn(rt1.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn singale task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn single task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2500000 {
let counter_copy = counter2.clone();
if let Err(e) = rt2.spawn(rt2.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn singale task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn single task ok, time: {:?}", Instant::now() - start);
});
thread::spawn(move || {
let start = Instant::now();
for _ in 0..2500000 {
let counter_copy = counter3.clone();
if let Err(e) = rt3.spawn(rt3.alloc(), async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed);
}) {
println!("!!!> spawn singale task failed, reason: {:?}", e);
}
}
println!("!!!!!!spawn single task ok, time: {:?}", Instant::now() - start);
});
thread::sleep(Duration::from_millis(1000000000));
}
#[test]
pub fn test_single_runtime_block_on() {
use std::time::Instant;
use std::ops::Drop;
use std::sync::atomic::AtomicUsize;
use crate::rt::serial::AsyncRuntimeExt;
struct AtomicCounter(AtomicUsize, Instant);
impl Drop for AtomicCounter {
fn drop(&mut self) {
unsafe {
println!("!!!!!!drop counter, count: {:?}, time: {:?}", self.0.load(Ordering::Relaxed), Instant::now() - self.1);
}
}
}
let pool = SingleTaskPool::default();
let rt = SingleTaskRunner::<(), SingleTaskPool<()>>::new(pool).into_local();
let counter = Arc::new(AtomicCounter(AtomicUsize::new(0), Instant::now()));
let start = Instant::now();
for _ in 0..10000000 {
let counter_copy = counter.clone();
rt.block_on(async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed)
});
}
println!("!!!!!!spawn single task ok, time: {:?}", Instant::now() - start);
}