use std::thread;
use std::vec::IntoIter;
use std::future::Future;
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::task::{Context, Poll, Waker};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::io::{Error, Result, ErrorKind};
use std::sync::atomic::AtomicUsize;
use std::time::Instant;
use async_stream::stream;
use flume::bounded as async_bounded;
use futures::{
future::{FutureExt, LocalBoxFuture},
stream::{LocalBoxStream, Stream, StreamExt},
task::{waker_ref, ArcWake},
};
use crate::{rt::{DEFAULT_MAX_HIGH_PRIORITY_BOUNDED, TaskId, AsyncPipelineResult, YieldNow, alloc_rt_uid,
serial::{AsyncRuntime, AsyncRuntimeExt, AsyncTaskPool, AsyncTaskPoolExt, AsyncTask, AsyncMapReduce, AsyncWait, AsyncWaitAny, AsyncWaitAnyCallback}}};
pub struct LocalTaskPool<O: Default + 'static> {
inner: UnsafeCell<VecDeque<Arc<AsyncTask<Self, O>>>>, }
unsafe impl<O: Default + 'static> Sync for LocalTaskPool<O> {}
impl<O: Default + 'static> Default for LocalTaskPool<O> {
fn default() -> Self {
LocalTaskPool {
inner: UnsafeCell::new(VecDeque::default()),
}
}
}
impl<O: Default + 'static> AsyncTaskPool<O> for LocalTaskPool<O> {
type Pool = LocalTaskPool<O>;
#[inline]
fn get_thread_id(&self) -> usize {
0
}
#[inline]
fn len(&self) -> usize {
unsafe {
(&*self.inner.get()).len()
}
}
#[inline]
fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
self.push_local(task)
}
#[inline]
fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
unsafe {
(&mut *self.inner.get()).push_back(task);
Ok(())
}
}
#[inline]
fn push_priority(&self,
_priority: usize,
task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
self.push_local(task)
}
#[inline]
fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
self.push_local(task)
}
#[inline]
fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
unsafe {
(&mut *self.inner.get()).pop_front()
}
}
#[inline]
fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
let mut all = Vec::with_capacity(self.len());
let internal = unsafe { &mut *self.inner.get() };
for _ in 0..internal.len() {
if let Some(task) = internal.pop_front() {
all.push(task);
}
}
all.into_iter()
}
}
impl<O: Default + 'static> AsyncTaskPoolExt<O> for LocalTaskPool<O> {
fn worker_len(&self) -> usize {
1
}
}
impl<O: Default + 'static> LocalTaskPool<O> {
pub fn new() -> Self {
Self::default()
}
#[inline]
pub(crate) fn internal_len(&self) -> usize {
unsafe {
(&*self.inner.get()).len()
}
}
#[inline]
pub(crate) fn will_wakeup(&self, task: Arc<AsyncTask<Self, O>>) {
unsafe {
(&mut *self.inner.get()).push_back(task);
}
}
}
pub struct LocalTaskRuntime<O: Default + 'static = ()>(Arc<InnerLocalTaskRuntime<O>>);
struct InnerLocalTaskRuntime<O: Default + 'static = ()> {
uid: usize, running: Arc<AtomicBool>, pool: Arc<LocalTaskPool<O>>, }
impl<O: Default + 'static> Clone for LocalTaskRuntime<O> {
fn clone(&self) -> Self {
LocalTaskRuntime(self.0.clone())
}
}
impl<O: Default + 'static> AsyncRuntime<O> for LocalTaskRuntime<O> {
type Pool = LocalTaskPool<O>;
fn shared_pool(&self) -> Arc<Self::Pool> {
self.0.pool.clone()
}
fn get_id(&self) -> usize {
self.0.uid
}
fn wait_len(&self) -> usize {
self.0.pool.len()
}
fn len(&self) -> usize {
self.wait_len()
}
fn alloc<R: 'static>(&self) -> TaskId {
TaskId(UnsafeCell::new(0))
}
fn spawn<F>(&self, future: F) -> Result<TaskId>
where
F: Future<Output = O> + 'static {
let task_id = self.alloc::<F::Output>();
if let Err(e) = self.spawn_by_id(task_id.clone(), future) {
return Err(e);
}
Ok(task_id)
}
fn spawn_local<F>(&self, future: F) -> Result<TaskId>
where
F: Future<Output = O> + 'static {
let task_id = self.alloc::<F::Output>();
if let Err(e) = self.spawn_local_by_id(task_id.clone(), future) {
return Err(e);
}
Ok(task_id)
}
fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
where
F: Future<Output = O> + 'static {
let task_id = self.alloc::<F::Output>();
if let Err(e) = self.spawn_priority_by_id(task_id.clone(), priority, future) {
return Err(e);
}
Ok(task_id)
}
fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
where
F: Future<Output = O> + 'static {
let task_id = self.alloc::<F::Output>();
if let Err(e) = self.spawn_yield_by_id(task_id.clone(), future) {
return Err(e);
}
Ok(task_id)
}
fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
where
F: Future<Output = O> + 'static {
let task_id = self.alloc::<F::Output>();
if let Err(e) = self.spawn_timing_by_id(task_id.clone(), future, time) {
return Err(e);
}
Ok(task_id)
}
fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where
F: Future<Output = O> + 'static {
self.spawn_local_by_id(task_id, future)
}
fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where
F: Future<Output = O> + 'static {
if let Err(e) = self.0.pool.push_local(Arc::new(AsyncTask::new(
task_id,
self.0.pool.clone(),
DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
Some(future.boxed_local()),
))) {
return Err(Error::new(ErrorKind::Other, e));
}
Ok(())
}
fn spawn_priority_by_id<F>(&self,
task_id: TaskId,
_priority: usize,
future: F) -> Result<()>
where
F: Future<Output = O> + 'static {
if let Err(e) = self.0.pool.push_priority(DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
Arc::new(AsyncTask::new(
task_id,
self.0.pool.clone(),
DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
Some(future.boxed_local()),
))) {
return Err(Error::new(ErrorKind::Other, e));
}
Ok(())
}
#[inline]
fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where
F: Future<Output = O> + 'static {
self.spawn_priority_by_id(task_id,
DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
future)
}
fn spawn_timing_by_id<F>(&self,
_task_id: TaskId,
_future: F,
_time: usize) -> Result<()>
where
F: Future<Output = O> + 'static {
Err(Error::new(ErrorKind::Other, "unimplemented"))
}
fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
unimplemented!()
}
fn wakeup<Output: 'static>(&self, task_id: &TaskId) {
unimplemented!()
}
fn wait<V: 'static>(&self) -> AsyncWait<V> {
AsyncWait::new(self.wait_any(2))
}
fn wait_any<V: 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncWaitAny::new(capacity, producor, consumer)
}
fn wait_any_callback<V: 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncWaitAnyCallback::new(capacity, producor, consumer)
}
fn map_reduce<V: 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncMapReduce::new(0, capacity, producor, consumer)
}
fn timeout(&self, _timeout: usize) -> LocalBoxFuture<'static, ()> {
unimplemented!()
}
fn yield_now(&self) -> LocalBoxFuture<'static, ()> {
async move {
YieldNow(false).await;
}.boxed_local()
}
fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> LocalBoxStream<'static, FO>
where
S: Stream<Item = SO> + 'static,
SO: 'static,
F: FnMut(SO) -> AsyncPipelineResult<FO> + 'static,
FO: 'static,
{
let output = stream! {
for await value in input {
match filter(value) {
AsyncPipelineResult::Disconnect => {
break;
},
AsyncPipelineResult::Filtered(result) => {
yield result;
},
}
}
};
output.boxed_local()
}
fn close(&self) -> bool {
if cfg!(target_arch = "aarch64") {
if let Ok(true) =
self
.0
.running
.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
{
true
} else {
false
}
} else {
if let Ok(true) =
self
.0
.running
.compare_exchange_weak(true, false, Ordering::SeqCst, Ordering::SeqCst)
{
true
} else {
false
}
}
}
}
impl<O: Default + 'static> AsyncRuntimeExt<O> for LocalTaskRuntime<O> {
fn spawn_with_context<F, C>(&self,
_task_id: TaskId,
_future: F,
_context: C) -> Result<()>
where F: Future<Output = O> + 'static,
C: 'static {
Err(Error::new(ErrorKind::Other, "unimplemented"))
}
fn spawn_timing_with_context<F, C>(&self,
task_id: TaskId,
future: F,
context: C,
time: usize) -> Result<()>
where F: Future<Output = O> + 'static,
C: 'static {
Err(Error::new(ErrorKind::Other, "unimplemented"))
}
fn block_on<F>(&self, future: F) -> Result<F::Output>
where F: Future + 'static,
<F as Future>::Output: Default + 'static {
let runner = LocalTaskRunner(self.clone());
let mut result: Option<<F as Future>::Output> = None;
let result_ptr = (&mut result) as *mut Option<<F as Future>::Output>;
self.spawn_local(async move {
let r = future.await;
unsafe {
*result_ptr = Some(r);
}
Default::default()
});
loop {
while self.internal_len() > 0 {
runner.run_once();
}
if let Some(result) = result.take() {
return Ok(result);
}
}
}
}
impl<O: Default + 'static> LocalTaskRuntime<O> {
#[inline]
pub fn is_running(&self) -> bool {
self
.0
.running
.load(Ordering::Relaxed)
}
#[inline]
pub fn internal_len(&self) -> usize {
self.0.pool.internal_len()
}
#[inline]
pub(crate) fn will_wakeup(&self, task: Arc<AsyncTask<<Self as AsyncRuntime<O>>::Pool, O>>) {
self.0.pool.will_wakeup(task);
}
pub fn send<F>(&self, future: F)
where
F: Future<Output = O> + 'static,
{
let task_id = self.alloc::<F::Output>();
self.0.pool.push(Arc::new(AsyncTask::new(
task_id,
self.0.pool.clone(),
DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
Some(future.boxed_local()),
)));
}
}
pub struct LocalTaskRunner<O: Default + 'static = ()>(LocalTaskRuntime<O>);
unsafe impl<O: Default + 'static> Send for LocalTaskRunner<O> {}
impl<O: Default + 'static> !Sync for LocalTaskRunner<O> {}
impl<O: Default + 'static> LocalTaskRunner<O> {
pub fn new() -> Self {
let inner = InnerLocalTaskRuntime {
uid: alloc_rt_uid(),
running: Arc::new(AtomicBool::new(false)),
pool: Arc::new(LocalTaskPool::new()),
};
LocalTaskRunner(LocalTaskRuntime(Arc::new(inner)))
}
pub fn get_runtime(&self) -> LocalTaskRuntime<O> {
self.0.clone()
}
pub fn startup(self,
thread_name: &str,
thread_stack_size: usize) -> LocalTaskRuntime<O> {
let rt = self.get_runtime();
let rt_copy = rt.clone();
let _ = thread::Builder::new()
.name(thread_name.to_string())
.stack_size(thread_stack_size)
.spawn(move || {
rt_copy
.0
.running
.store(true, Ordering::Relaxed);
while rt_copy.is_running() {
self.run_once();
}
});
rt
}
#[inline]
pub fn run_once(&self) {
unsafe {
if let Some(task) = (self.0).0.pool.try_pop() {
let waker = waker_ref(&task);
let mut context = Context::from_waker(&*waker);
if let Some(mut future) = task.get_inner() {
if let Poll::Pending = future.as_mut().poll(&mut context) {
task.set_inner(Some(future));
}
}
}
}
}
pub fn into_local(self) -> LocalTaskRuntime<O> {
self.0
}
}
#[test]
fn test_local_compatible_wasm_runtime_block_on() {
struct AtomicCounter(AtomicUsize, Instant);
impl Drop for AtomicCounter {
fn drop(&mut self) {
{
println!(
"!!!!!!drop counter, count: {:?}, time: {:?}",
self.0.load(Ordering::Relaxed),
Instant::now() - self.1
);
}
}
}
let rt = LocalTaskRunner::<()>::new().into_local();
let counter = Arc::new(AtomicCounter(AtomicUsize::new(0), Instant::now()));
for _ in 0..10000000 {
let counter_copy = counter.clone();
let _ = rt.block_on(async move { counter_copy.0.fetch_add(1, Ordering::Relaxed) });
}
}