use std::future::Future;
use std::time::Duration;
use std::task::{Poll, Waker};
use std::io::{Error, Result, ErrorKind};
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use parking_lot::{Mutex, Condvar};
use futures::{future::LocalBoxFuture,
stream::{Stream, LocalBoxStream}};
use crate::rt::{TaskId, AsyncPipelineResult,
serial::{AsyncRuntime,
AsyncRuntimeExt,
AsyncTaskPool,
AsyncTaskPoolExt,
AsyncWait,
AsyncWaitAny,
AsyncWaitAnyCallback,
AsyncMapReduce,
LocalAsyncRuntime,
spawn_worker_thread, wakeup_worker_thread},
serial_single_thread::{SingleTaskPool, SingleTaskRunner, SingleTaskRuntime}};
pub struct WorkerRuntime<
O: Default + 'static = (),
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>,
>(Arc<(
Arc<AtomicBool>, //工作者状态
Arc<(AtomicBool, Mutex<()>, Condvar)>, //工作者线程唤醒器
SingleTaskRuntime<O, P> //单线程运行时
)>);
unsafe impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
> Send for WorkerRuntime<O, P> {}
unsafe impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
> Sync for WorkerRuntime<O, P> {}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
> Clone for WorkerRuntime<O, P> {
fn clone(&self) -> Self {
WorkerRuntime(self.0.clone())
}
}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
> AsyncRuntime<O> for WorkerRuntime<O, P> {
type Pool = P;
#[inline]
fn shared_pool(&self) -> Arc<Self::Pool> {
(self.0).2.shared_pool()
}
#[inline]
fn get_id(&self) -> usize {
(self.0).2.get_id()
}
#[inline]
fn wait_len(&self) -> usize {
(self.0).2.wait_len()
}
#[inline]
fn len(&self) -> usize {
(self.0).2.len()
}
#[inline]
fn alloc<R: 'static>(&self) -> TaskId {
(self.0).2.alloc::<R>()
}
fn spawn<F>(&self, future: F) -> Result<TaskId>
where F: Future<Output = O> + 'static {
if !(self.0).0.load(Ordering::SeqCst) {
return Err(Error::new(ErrorKind::Other, "Spawn async task failed, reason: worker already closed"));
}
let result = (self.0).2.spawn(future);
wakeup_worker_thread(&(self.0).1, &(self.0).2);
result
}
fn spawn_local<F>(&self, future: F) -> Result<TaskId>
where
F: Future<Output = O> + 'static {
if !(self.0).0.load(Ordering::SeqCst) {
return Err(Error::new(ErrorKind::Other, "Spawn local async task failed, reason: worker already closed"));
}
let result = (self.0).2.spawn_local(future);
wakeup_worker_thread(&(self.0).1, &(self.0).2);
result
}
fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
where
F: Future<Output = O> + 'static {
if !(self.0).0.load(Ordering::SeqCst) {
return Err(Error::new(ErrorKind::Other, "Spawn priority async task failed, reason: worker already closed"));
}
let result = (self.0).2.spawn_priority(priority, future);
wakeup_worker_thread(&(self.0).1, &(self.0).2);
result
}
fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
where
F: Future<Output = O> + 'static {
if !(self.0).0.load(Ordering::SeqCst) {
return Err(Error::new(ErrorKind::Other, "Spawn yield priority async task failed, reason: worker already closed"));
}
let result = (self.0).2.spawn_yield(future);
wakeup_worker_thread(&(self.0).1, &(self.0).2);
result
}
fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
where F: Future<Output = O> + 'static {
if !(self.0).0.load(Ordering::SeqCst) {
return Err(Error::new(ErrorKind::Other, "Spawn timing async task failed, reason: worker already closed"));
}
let result = (self.0).2.spawn_timing(future, time);
wakeup_worker_thread(&(self.0).1, &(self.0).2);
result
}
fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where
F: Future<Output=O> + 'static {
if !(self.0).0.load(Ordering::SeqCst) {
return Err(Error::new(ErrorKind::Other, "Spawn async task by id failed, reason: worker already closed"));
}
let result = (self.0).2.spawn_by_id(task_id, future);
wakeup_worker_thread(&(self.0).1, &(self.0).2);
result
}
fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where
F: Future<Output=O> + 'static {
if !(self.0).0.load(Ordering::SeqCst) {
return Err(Error::new(ErrorKind::Other, "Spawn local async task by id failed, reason: worker already closed"));
}
let result = (self.0).2.spawn_local_by_id(task_id, future);
wakeup_worker_thread(&(self.0).1, &(self.0).2);
result
}
fn spawn_priority_by_id<F>(&self,
task_id: TaskId,
priority: usize,
future: F) -> Result<()>
where
F: Future<Output=O> + 'static {
if !(self.0).0.load(Ordering::SeqCst) {
return Err(Error::new(ErrorKind::Other, "Spawn priority async task by id failed, reason: worker already closed"));
}
let result = (self.0).2.spawn_priority_by_id(task_id, priority, future);
wakeup_worker_thread(&(self.0).1, &(self.0).2);
result
}
fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
where
F: Future<Output=O> + 'static {
if !(self.0).0.load(Ordering::SeqCst) {
return Err(Error::new(ErrorKind::Other, "Spawn yield async task by id failed, reason: worker already closed"));
}
let result = (self.0).2.spawn_yield_by_id(task_id, future);
wakeup_worker_thread(&(self.0).1, &(self.0).2);
result
}
fn spawn_timing_by_id<F>(&self,
task_id: TaskId,
future: F,
time: usize) -> Result<()>
where
F: Future<Output=O> + 'static {
if !(self.0).0.load(Ordering::SeqCst) {
return Err(Error::new(ErrorKind::Other, "Spawn timing async task by id failed, reason: worker already closed"));
}
let result = (self.0).2.spawn_timing_by_id(task_id, future, time);
wakeup_worker_thread(&(self.0).1, &(self.0).2);
result
}
#[inline]
fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
(self.0).2.pending::<Output>(task_id, waker)
}
#[inline]
fn wakeup<Output: 'static>(&self, task_id: &TaskId) {
(self.0).2.wakeup::<Output>(task_id);
}
#[inline]
fn wait<V: 'static>(&self) -> AsyncWait<V> {
(self.0).2.wait()
}
#[inline]
fn wait_any<V: 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
(self.0).2.wait_any(capacity)
}
#[inline]
fn wait_any_callback<V: 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
(self.0).2.wait_any_callback(capacity)
}
#[inline]
fn map_reduce<V: 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
(self.0).2.map_reduce(capacity)
}
#[inline]
fn timeout(&self, timeout: usize) -> LocalBoxFuture<'static, ()> {
(self.0).2.timeout(timeout)
}
#[inline]
fn yield_now(&self) -> LocalBoxFuture<'static, ()> {
(self.0).2.yield_now()
}
#[inline]
fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> LocalBoxStream<'static, FO>
where S: Stream<Item = SO> + 'static,
SO: 'static,
F: FnMut(SO) -> AsyncPipelineResult<FO> + 'static,
FO: 'static {
(self.0).2.pipeline(input, filter)
}
fn close(&self) -> bool {
if cfg!(target_arch = "aarch64") {
if let Ok(true) = (self.0).0.compare_exchange(true,
false,
Ordering::SeqCst,
Ordering::SeqCst) {
wakeup_worker_thread(&(self.0).1, &(self.0).2);
true
} else {
false
}
} else {
if let Ok(true) = (self.0).0.compare_exchange_weak(true,
false,
Ordering::SeqCst,
Ordering::SeqCst) {
wakeup_worker_thread(&(self.0).1, &(self.0).2);
true
} else {
false
}
}
}
}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
> AsyncRuntimeExt<O> for WorkerRuntime<O, P> {
#[inline]
fn spawn_with_context<F, C>(&self,
task_id: TaskId,
future: F,
context: C) -> Result<()>
where F: Future<Output = O> + 'static,
C: 'static {
(self.0).2.spawn_with_context(task_id, future, context)
}
#[inline]
fn spawn_timing_with_context<F, C>(&self,
task_id: TaskId,
future: F,
context: C,
time: usize) -> Result<()>
where F: Future<Output = O> + 'static,
C: 'static {
(self.0).2.spawn_timing_with_context(task_id, future, context, time)
}
#[inline]
fn block_on<F>(&self, future: F) -> Result<F::Output>
where F: Future + 'static,
<F as Future>::Output: Default + 'static {
(self.0).2.block_on::<F>(future)
}
}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
> WorkerRuntime<O, P> {
pub fn get_worker_status(&self) -> &Arc<AtomicBool> {
&(self.0).0
}
pub fn get_worker_waker(&self) -> &Arc<(AtomicBool, Mutex<()>, Condvar)> {
&(self.0).1
}
pub fn get_worker_runtime(&self) -> &SingleTaskRuntime<O, P> {
&(self.0).2
}
pub fn to_local_runtime(&self) -> LocalAsyncRuntime<O> {
LocalAsyncRuntime::new(
self.as_raw(),
WorkerRuntime::<O, P>::get_id_raw,
WorkerRuntime::<O, P>::spawn_raw,
WorkerRuntime::<O, P>::spawn_timing_raw,
WorkerRuntime::<O, P>::timeout_raw
)
}
#[inline]
pub(crate) fn as_raw(&self) -> *const () {
Arc::into_raw(self.0.clone()) as *const ()
}
#[inline]
pub(crate) fn from_raw(raw: *const ()) -> Self {
let inner = unsafe {
Arc::from_raw(raw as *const (
Arc<AtomicBool>,
Arc<(AtomicBool, Mutex<()>, Condvar)>,
SingleTaskRuntime<O, P>),
)
};
WorkerRuntime(inner)
}
pub(crate) fn get_id_raw(raw: *const ()) -> usize {
let rt = WorkerRuntime::<O, P>::from_raw(raw);
let id = rt.get_id();
Arc::into_raw(rt.0); id
}
pub(crate) fn spawn_raw(raw: *const (),
future: LocalBoxFuture<'static, O>) -> Result<()> {
let rt = WorkerRuntime::<O, P>::from_raw(raw);
let result = rt.spawn_by_id(rt.alloc::<O>(), future);
Arc::into_raw(rt.0); result
}
pub(crate) fn spawn_timing_raw(raw: *const (),
future: LocalBoxFuture<'static, O>,
timeout: usize) -> Result<()> {
let rt = WorkerRuntime::<O, P>::from_raw(raw);
let result = rt.spawn_timing_by_id(rt.alloc::<O>(), future, timeout);
Arc::into_raw(rt.0); result
}
pub(crate) fn timeout_raw(raw: *const (),
timeout: usize) -> LocalBoxFuture<'static, ()> {
let rt = WorkerRuntime::<O, P>::from_raw(raw);
let boxed = rt.timeout(timeout);
Arc::into_raw(rt.0); boxed
}
}
pub struct WorkerTaskRunner<
O: Default + 'static = (),
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P> = SingleTaskPool<O>,
>(Arc<(
Arc<AtomicBool>, //工作者状态
Arc<(AtomicBool, Mutex<()>, Condvar)>, //工作者线程唤醒器
SingleTaskRunner<O, P>, //单线程异步任务执行器
WorkerRuntime<O, P>, //工作者运行时
)>);
unsafe impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
> Send for WorkerTaskRunner<O, P> {}
unsafe impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
> Sync for WorkerTaskRunner<O, P> {}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
> Clone for WorkerTaskRunner<O, P> {
fn clone(&self) -> Self {
WorkerTaskRunner(self.0.clone())
}
}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
> From<(Arc<AtomicBool>, Arc<(AtomicBool, Mutex<()>, Condvar)>, SingleTaskRuntime<O, P>)> for WorkerRuntime<O, P> {
fn from(from: (Arc<AtomicBool>,
Arc<(AtomicBool, Mutex<()>, Condvar)>,
SingleTaskRuntime<O, P>,)) -> Self {
WorkerRuntime(Arc::new(from))
}
}
impl<O: Default + 'static> Default for WorkerTaskRunner<O> {
fn default() -> Self {
WorkerTaskRunner::new(SingleTaskPool::default(),
Arc::new(AtomicBool::new(true)),
Arc::new((AtomicBool::new(false), Mutex::new(()), Condvar::new())))
}
}
impl<
O: Default + 'static,
P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
> WorkerTaskRunner<O, P> {
pub fn new(pool: P,
worker_status: Arc<AtomicBool>,
worker_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) -> Self {
let runner = SingleTaskRunner::new(pool);
let rt = runner.startup().unwrap();
let inner = (worker_status.clone(), worker_waker.clone(), rt);
let runtime = WorkerRuntime(Arc::new(inner));
let inner = (worker_status,
worker_waker,
runner,
runtime);
WorkerTaskRunner(Arc::new(inner))
}
pub fn get_runtime(&self) -> WorkerRuntime<O, P> {
(self.0).3.clone()
}
#[inline]
pub fn run_once(&self) -> Result<usize> {
(self.0).2.run_once()
}
#[inline]
pub fn run(&self) -> Result<usize> {
(self.0).2.run()
}
pub fn startup<LF, GQL>(self,
thread_name: &str,
thread_stack_size: usize,
sleep_timeout: u64,
loop_interval: Option<u64>,
loop_func: LF,
get_queue_len: GQL) -> WorkerRuntime<O, P>
where P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
LF: Fn() -> (bool, Duration) + Send + 'static,
GQL: Fn() -> usize + Send + 'static{
let rt_copy = (self.0).3.clone();
let thread_handler = (self.0).0.clone();
let thread_waker = (self.0).1.clone();
spawn_worker_thread(
thread_name,
thread_stack_size,
thread_handler,
thread_waker,
sleep_timeout,
loop_interval,
loop_func,
move || {
rt_copy.wait_len() + get_queue_len()
},
);
(self.0).3.clone()
}
}