use std::thread;
use std::task::Context;
use std::future::Future;
use std::cell::UnsafeCell;
use std::task::Poll::Pending;
use std::collections::VecDeque;
use std::io::Result as IOResult;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::sync::atomic::AtomicUsize;
use std::time::{Duration, Instant};
use async_stream::stream;
use crossbeam_queue::SegQueue;
use flume::bounded as async_bounded;
use futures::{
future::{FutureExt, LocalBoxFuture},
stream::{LocalBoxStream, Stream, StreamExt},
task::{waker_ref, ArcWake},
};
#[cfg(not(target_arch = "wasm32"))]
use polling::{Events, Poller};
use crate::{
rt::{
alloc_rt_uid,
serial::{AsyncMapReduce, AsyncWait, AsyncWaitAny, AsyncWaitAnyCallback},
AsyncPipelineResult, YieldNow
},
};
pub(crate) struct LocalTask<O: Default + 'static = ()> {
inner: UnsafeCell<Option<LocalBoxFuture<'static, O>>>, runtime: LocalTaskRuntime<O>, }
unsafe impl<O: Default + 'static> Send for LocalTask<O> {}
unsafe impl<O: Default + 'static> Sync for LocalTask<O> {}
impl<O: Default + 'static> ArcWake for LocalTask<O> {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.runtime.will_wakeup(arc_self.clone());
}
}
impl<O: Default + 'static> LocalTask<O> {
pub fn get_inner(&self) -> Option<LocalBoxFuture<'static, O>> {
unsafe { (&mut *self.inner.get()).take() }
}
pub fn set_inner(&self, inner: Option<LocalBoxFuture<'static, O>>) {
unsafe {
*self.inner.get() = inner;
}
}
}
#[cfg(not(target_arch = "wasm32"))]
pub struct LocalTaskRuntime<O: Default + 'static = ()>(
Arc<(
usize, //运行时唯一id
Arc<AtomicBool>, //运行状态
SegQueue<Arc<LocalTask<O>>>, //外部任务队列
UnsafeCell<VecDeque<Arc<LocalTask<O>>>>, //内部任务队列
Option<AtomicBool>, //合并唤醒标志
Option<Arc<Poller>>, //用于阻塞等待和跨线程唤醒
)>,
);
#[cfg(target_arch = "wasm32")]
pub struct LocalTaskRuntime<O: Default + 'static = ()>(
Arc<(
usize, //运行时唯一id
Arc<AtomicBool>, //运行状态
SegQueue<Arc<LocalTask<O>>>, //外部任务队列
UnsafeCell<VecDeque<Arc<LocalTask<O>>>>, //内部任务队列
Option<AtomicBool>, //合并唤醒标志
)>,
);
unsafe impl<O: Default + 'static> Send for LocalTaskRuntime<O> {}
impl<O: Default + 'static> !Sync for LocalTaskRuntime<O> {}
impl<O: Default + 'static> Clone for LocalTaskRuntime<O> {
fn clone(&self) -> Self {
LocalTaskRuntime(self.0.clone())
}
}
impl<O: Default + 'static> LocalTaskRuntime<O> {
#[inline]
pub fn is_running(&self) -> bool {
self.0.1.load(Ordering::Relaxed)
}
pub fn get_id(&self) -> usize {
self.0.0
}
pub fn len(&self) -> usize {
unsafe {
(self.0).2.len() + self.internal_len()
}
}
#[inline]
pub(crate) fn internal_len(&self) -> usize {
unsafe {
(&*self.0.3.get()).len()
}
}
pub fn spawn<F>(&self, future: F)
where
F: Future<Output = O> + 'static,
{
unsafe {
(&mut *(self.0).3.get()).push_back(Arc::new(LocalTask {
inner: UnsafeCell::new(Some(future.boxed_local())),
runtime: self.clone(),
}));
}
}
#[inline]
pub(crate) fn will_wakeup(&self, task: Arc<LocalTask<O>>) {
self.0.2.push(task);
}
pub fn send<F>(&self, future: F)
where
F: Future<Output = O> + 'static,
{
self.0.2.push(Arc::new(LocalTask {
inner: UnsafeCell::new(Some(future.boxed_local())),
runtime: self.clone(),
}));
#[cfg(not(target_arch = "wasm32"))]
if let Some(sleeping) = &self.0.4 {
if sleeping.compare_exchange(
false,
true,
Ordering::AcqRel,
Ordering::Relaxed).is_ok()
{
let _ = self
.0
.5
.as_ref()
.unwrap()
.notify();
}
}
}
#[inline]
pub fn poll(&self) {
let internal = unsafe { &mut * (self.0).3.get() };
while let Some(task) = (self.0).2.pop() {
internal.push_back(task);
}
}
pub fn wait<V: 'static>(&self) -> AsyncWait<V> {
AsyncWait::new(self.wait_any(2))
}
pub fn wait_any<V: 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncWaitAny::new(capacity, producor, consumer)
}
pub fn wait_any_callback<V: 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncWaitAnyCallback::new(capacity, producor, consumer)
}
pub fn map_reduce<V: 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncMapReduce::new(0, capacity, producor, consumer)
}
pub fn yield_now(&self) -> LocalBoxFuture<'static, ()> {
async move {
YieldNow(false).await;
}.boxed_local()
}
pub fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> LocalBoxStream<'static, FO>
where
S: Stream<Item = SO> + 'static,
SO: 'static,
F: FnMut(SO) -> AsyncPipelineResult<FO> + 'static,
FO: 'static,
{
let output = stream! {
for await value in input {
match filter(value) {
AsyncPipelineResult::Disconnect => {
break;
},
AsyncPipelineResult::Filtered(result) => {
yield result;
},
}
}
};
output.boxed_local()
}
pub fn block_on<F>(&self, future: F) -> IOResult<F::Output>
where
F: Future + 'static,
<F as Future>::Output: Default + 'static,
{
let runner = LocalTaskRunner(self.clone());
let mut result: Option<<F as Future>::Output> = None;
let result_ptr = (&mut result) as *mut Option<<F as Future>::Output>;
self.spawn(async move {
let r = future.await;
unsafe {
*result_ptr = Some(r);
}
Default::default()
});
loop {
while self.internal_len() > 0 {
runner.run_once();
}
if let Some(result) = result.take() {
return Ok(result);
}
}
}
pub fn close(self) -> bool {
if cfg!(target_arch = "aarch64") {
if let Ok(true) =
(self.0)
.1
.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
{
true
} else {
false
}
} else {
if let Ok(true) =
(self.0)
.1
.compare_exchange_weak(true, false, Ordering::SeqCst, Ordering::SeqCst)
{
true
} else {
false
}
}
}
}
pub struct LocalTaskRunner<O: Default + 'static = ()>(LocalTaskRuntime<O>);
unsafe impl<O: Default + 'static> Send for LocalTaskRunner<O> {}
impl<O: Default + 'static> !Sync for LocalTaskRunner<O> {}
impl<O: Default + 'static> LocalTaskRunner<O> {
pub fn new() -> Self {
#[cfg(not(target_arch = "wasm32"))]
let inner = (
alloc_rt_uid(),
Arc::new(AtomicBool::new(false)),
SegQueue::new(),
UnsafeCell::new(VecDeque::new()),
None,
None,
);
#[cfg(target_arch = "wasm32")]
let inner = (
crate::rt::alloc_rt_uid(),
Arc::new(AtomicBool::new(false)),
SegQueue::new(),
UnsafeCell::new(VecDeque::new()),
None,
);
LocalTaskRunner(LocalTaskRuntime(Arc::new(inner)))
}
#[cfg(not(target_arch = "wasm32"))]
pub fn with_poll(poller: Arc<Poller>) -> Self {
let inner = (
alloc_rt_uid(),
Arc::new(AtomicBool::new(false)),
SegQueue::new(),
UnsafeCell::new(VecDeque::new()),
Some(AtomicBool::new(false)),
Some(poller),
);
LocalTaskRunner(LocalTaskRuntime(Arc::new(inner)))
}
#[cfg(not(target_arch = "wasm32"))]
#[inline(always)]
pub fn is_with_polling(&self) -> bool {
self.0.0.4.is_some()
&& self.0.0.5.is_some()
}
pub fn get_runtime(&self) -> LocalTaskRuntime<O> {
self.0.clone()
}
pub fn startup(
self,
thread_name: &str,
thread_stack_size: usize
) -> LocalTaskRuntime<O> {
let rt = self.get_runtime();
let rt_copy = rt.clone();
let _ = thread::Builder::new()
.name(thread_name.to_string())
.stack_size(thread_stack_size)
.spawn(move || {
(rt_copy.0).1.store(true, Ordering::Relaxed);
while rt_copy.is_running() {
self.poll();
self.run_once();
}
});
rt
}
#[cfg(not(target_arch = "wasm32"))]
pub fn startup_with_poll(
self,
thread_name: &str,
thread_stack_size: usize,
try_count: usize,
timeout: Option<Duration>,
) -> LocalTaskRuntime<O> {
let rt = self.get_runtime();
let rt_copy = rt.clone();
let _ = thread::Builder::new()
.name(thread_name.to_string())
.stack_size(thread_stack_size)
.spawn(move || {
(rt_copy.0).1.store(true, Ordering::Relaxed);
let mut count = try_count;
while rt_copy.is_running() {
self.poll();
self.run_once();
match self.try_sleep(count, timeout) {
Err(e) => {
rt_copy.0.1.store(false, Ordering::Release);
panic!("Run runtime failed, reason: {:?}", e);
},
Ok(Some(new_count)) => {
count = new_count;
continue;
},
Ok(None) => {
count = try_count;
continue;
},
}
}
});
rt
}
#[inline]
pub fn poll(&self) {
while let Some(task) = ((self.0).0).2.pop() {
unsafe {
(&mut *((self.0).0).3.get()).push_back(task);
}
}
}
#[inline]
pub fn run_once(&self) {
unsafe {
if let Some(task) = (&mut *(&(self.0).0).3.get()).pop_front() {
let waker = waker_ref(&task);
let mut context = Context::from_waker(&*waker);
if let Some(mut future) = task.get_inner() {
if let Pending = future.as_mut().poll(&mut context) {
task.set_inner(Some(future));
}
}
}
}
}
#[cfg(not(target_arch = "wasm32"))]
#[inline]
pub fn try_sleep(
&self,
try_count: usize,
timeout: Option<Duration>
) -> IOResult<Option<usize>> {
if !self.is_with_polling() {
return Ok(Some(try_count));
}
if self.0.len() != 0 {
return Ok(Some(try_count));
}
if try_count != 0 {
return Ok(Some(try_count - 1));
}
let mut events = Events::with_capacity(std::num::NonZeroUsize::new(1).unwrap());
let _ = self
.0
.0
.5
.as_ref()
.unwrap()
.wait(&mut events, timeout)?;
self
.0
.0
.4
.as_ref()
.unwrap()
.store(false, Ordering::Release);
return Ok(None);
}
pub fn into_local(self) -> LocalTaskRuntime<O> {
self.0
}
}
#[test]
fn test_local_runtime_block_on() {
struct AtomicCounter(AtomicUsize, Instant);
impl Drop for AtomicCounter {
fn drop(&mut self) {
{
println!(
"!!!!!!drop counter, count: {:?}, time: {:?}",
self.0.load(Ordering::Relaxed),
Instant::now() - self.1
);
}
}
}
let rt = LocalTaskRunner::<()>::new().into_local();
let counter = Arc::new(AtomicCounter(AtomicUsize::new(0), Instant::now()));
for _ in 0..10000000 {
let counter_copy = counter.clone();
let _ = rt.block_on(async move { counter_copy.0.fetch_add(1, Ordering::Relaxed) });
}
}