use std::thread;
use std::rc::Rc;
use std::io::Result;
use std::future::Future;
use std::cell::UnsafeCell;
use std::task::{Context, Poll};
use std::collections::VecDeque;
use std::io::{Error, Result as IOResult, ErrorKind};
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use futures::{task::{ArcWake, waker_ref},
future::{FutureExt, LocalBoxFuture},
stream::{StreamExt, Stream, LocalBoxStream}};
use async_stream::stream;
use crossbeam_channel::bounded;
use crossbeam_queue::SegQueue;
use flume::bounded as async_bounded;
use crate::{lock::spin,
rt::{AsyncPipelineResult, alloc_rt_uid,
serial::{AsyncWait, AsyncWaitAny, AsyncWaitAnyCallback, AsyncMapReduce}}};
pub(crate) struct LocalTask<O: Default + 'static = ()> {
inner: UnsafeCell<Option<LocalBoxFuture<'static, O>>>, runtime: LocalTaskRuntime<O>, }
unsafe impl<O: Default + 'static> Send for LocalTask<O> {}
unsafe impl<O: Default + 'static> Sync for LocalTask<O> {}
impl<O: Default + 'static> ArcWake for LocalTask<O> {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.runtime.will_wakeup_once(arc_self.clone());
}
}
impl<O: Default + 'static> LocalTask<O> {
pub fn get_inner(&self) -> Option<LocalBoxFuture<'static, O>> {
unsafe {
(&mut *self.inner.get()).take()
}
}
pub fn set_inner(&self, inner: Option<LocalBoxFuture<'static, O>>) {
unsafe {
*self.inner.get() = inner;
}
}
}
pub struct LocalTaskRuntime<O: Default + 'static = ()>(Arc<(
usize, //运行时唯一id
Arc<AtomicBool>, //运行状态
SegQueue<Arc<LocalTask<O>>>, //本地异步任务池
)>);
unsafe impl<O: Default + 'static> Send for LocalTaskRuntime<O> {}
impl<O: Default + 'static> !Sync for LocalTaskRuntime<O> {}
impl<O: Default + 'static> Clone for LocalTaskRuntime<O> {
fn clone(&self) -> Self {
LocalTaskRuntime(self.0.clone())
}
}
impl<O: Default + 'static> LocalTaskRuntime<O> {
#[inline]
pub fn is_running(&self) -> bool {
(self.0).1.load(Ordering::Relaxed)
}
pub fn get_id(&self) -> usize {
(self.0).0
}
pub fn len(&self) -> usize {
unsafe {
(self.0).2.len()
}
}
pub fn spawn<F>(&self, future: F)
where F: Future<Output = O> + 'static {
self.will_wakeup_once(Arc::new(LocalTask {
inner: UnsafeCell::new(Some(future.boxed_local())),
runtime: self.clone(),
}));
}
pub fn send<F>(&self, future: F)
where F: Future<Output = O> + 'static {
self.will_wakeup_once(Arc::new(LocalTask {
inner: UnsafeCell::new(Some(future.boxed_local())),
runtime: self.clone(),
}));
}
#[inline]
pub fn wakeup_once(&self) {
}
#[inline]
fn will_wakeup_once(&self, task: Arc<LocalTask<O>>) {
(self.0).2.push(task);
}
fn wait<V: 'static>(&self) -> AsyncWait<V> {
AsyncWait::new(self.wait_any(2))
}
fn wait_any<V: 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncWaitAny::new(capacity, producor, consumer)
}
fn wait_any_callback<V: 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncWaitAnyCallback::new(capacity, producor, consumer)
}
fn map_reduce<V: 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
let (producor, consumer) = async_bounded(capacity);
AsyncMapReduce::new(0, capacity, producor, consumer)
}
pub fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> LocalBoxStream<'static, FO>
where S: Stream<Item = SO> + 'static,
SO: 'static,
F: FnMut(SO) -> AsyncPipelineResult<FO> + 'static,
FO: 'static {
let output = stream! {
for await value in input {
match filter(value) {
AsyncPipelineResult::Disconnect => {
break;
},
AsyncPipelineResult::Filtered(result) => {
yield result;
},
}
}
};
output.boxed_local()
}
pub fn block_on<F>(&self, future: F) -> IOResult<F::Output>
where F: Future + 'static,
<F as Future>::Output: Default + 'static {
let (sender, receiver) = bounded(1);
self.spawn(async move {
let r = future.await;
sender.send(r);
Default::default()
});
self.wakeup_once();
let mut count = 0;
let mut spin_len = 1;
loop {
count += 1;
if count > 3 {
spin_len = spin(spin_len);
}
unsafe {
let option = (self.0).2.pop();
if let Some(task) = option {
let waker = waker_ref(&task);
let mut context = Context::from_waker(&*waker);
if let Some(mut future) = task.get_inner() {
if let Poll::Pending = future.as_mut().poll(&mut context) {
task.set_inner(Some(future));
}
}
}
}
match receiver.try_recv() {
Err(e) => {
if e.is_disconnected() {
return Err(Error::new(ErrorKind::Other, format!("Block on failed, reason: {:?}", e)));
}
},
Ok(result) => {
return Ok(result)
},
}
}
}
pub fn close(self) -> bool {
if cfg!(target_arch = "aarch64") {
if let Ok(true) = (self.0).1.compare_exchange(true,
false,
Ordering::SeqCst,
Ordering::SeqCst) {
true
} else {
false
}
} else {
if let Ok(true) = (self.0).1.compare_exchange_weak(true,
false,
Ordering::SeqCst,
Ordering::SeqCst) {
true
} else {
false
}
}
}
}
pub struct LocalTaskRunner<O: Default + 'static = ()>(LocalTaskRuntime<O>);
unsafe impl<O: Default + 'static> Send for LocalTaskRunner<O> {}
impl<O: Default + 'static> !Sync for LocalTaskRunner<O> {}
impl<O: Default + 'static> LocalTaskRunner<O> {
pub fn new() -> Self {
let inner = (
alloc_rt_uid(),
Arc::new(AtomicBool::new(false)),
SegQueue::new(),
);
LocalTaskRunner(LocalTaskRuntime(Arc::new(inner)))
}
pub fn get_runtime(&self) -> LocalTaskRuntime<O> {
self.0.clone()
}
pub fn startup(self,
thread_name: &str,
thread_stack_size: usize) -> LocalTaskRuntime<O> {
let rt = self.get_runtime();
let rt_copy = rt.clone();
thread::Builder::new()
.name(thread_name.to_string())
.stack_size(thread_stack_size)
.spawn(move || {
(rt_copy.0).1.store(true, Ordering::Relaxed);
while rt_copy.is_running() {
rt_copy.wakeup_once();
self.run_once();
}
});
rt
}
#[inline]
pub fn run_once(&self) {
unsafe {
let option = (&(self.0).0).2.pop();
if let Some(task) = option {
let waker = waker_ref(&task);
let mut context = Context::from_waker(&*waker);
if let Some(mut future) = task.get_inner() {
if let Poll::Pending = future.as_mut().poll(&mut context) {
task.set_inner(Some(future));
}
}
}
}
}
pub fn into_local(self) -> LocalTaskRuntime<O> {
self.0
}
}
#[test]
fn test_local_runtime_block_on() {
use std::time::Instant;
use std::ops::Drop;
use std::sync::atomic::AtomicUsize;
struct AtomicCounter(AtomicUsize, Instant);
impl Drop for AtomicCounter {
fn drop(&mut self) {
unsafe {
println!("!!!!!!drop counter, count: {:?}, time: {:?}", self.0.load(Ordering::Relaxed), Instant::now() - self.1);
}
}
}
let rt = LocalTaskRunner::<()>::new().into_local();
let counter = Arc::new(AtomicCounter(AtomicUsize::new(0), Instant::now()));
let start = Instant::now();
for _ in 0..10000000 {
let counter_copy = counter.clone();
let _ = rt.block_on(async move {
counter_copy.0.fetch_add(1, Ordering::Relaxed)
});
}
println!("!!!!!!spawn local task ok, time: {:?}", Instant::now() - start);
}