pi_async_rt/rt/
mod.rs

1//! # 提供了通用的异步运行时
2//!
3
4use std::thread;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::ptr::null_mut;
8use std::vec::IntoIter;
9use std::future::Future;
10use std::panic::set_hook;
11use std::any::{Any, TypeId};
12use std::marker::PhantomData;
13use std::ops::{Deref, DerefMut};
14use std::cell::{RefCell, UnsafeCell};
15use std::task::{Waker, Context, Poll};
16use std::time::{Duration, SystemTime};
17use std::io::{Error, Result, ErrorKind};
18use std::alloc::{Layout, set_alloc_error_hook};
19use std::fmt::{Debug, Formatter, Result as FmtResult};
20use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, AtomicPtr, Ordering};
21
22pub mod single_thread;
23pub mod multi_thread;
24pub mod worker_thread;
25pub mod serial;
26pub mod serial_local_thread;
27pub mod serial_single_thread;
28pub mod serial_worker_thread;
29pub mod serial_local_compatible_wasm_runtime;
30
31use libc;
32use futures::{future::{FutureExt, BoxFuture},
33              stream::{Stream, BoxStream},
34              task::ArcWake};
35use parking_lot::{Mutex, Condvar};
36use crossbeam_channel::{Sender, Receiver, unbounded};
37use crossbeam_queue::ArrayQueue;
38use crossbeam_utils::atomic::AtomicCell;
39use flume::{Sender as AsyncSender, Receiver as AsyncReceiver};
40use num_cpus;
41use backtrace::Backtrace;
42use slotmap::{Key, KeyData};
43use quanta::{Clock, Upkeep, Handle, Instant as QInstant};
44
45use pi_hash::XHashMap;
46use pi_cancel_timer::Timer;
47use pi_timer::Timer as NotCancelTimer;
48
49use single_thread::SingleTaskRuntime;
50use worker_thread::{WorkerTaskRunner, WorkerRuntime};
51use multi_thread::{MultiTaskRuntimeBuilder, MultiTaskRuntime};
52
53use crate::lock::spin;
54
55/*
56* 本地线程绑定的异步运行时
57*/
58thread_local! {
59    static PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME: AtomicPtr<()> = AtomicPtr::new(null_mut());
60    static PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT: UnsafeCell<XHashMap<TypeId, Box<dyn Any + 'static>>> = UnsafeCell::new(XHashMap::default());
61}
62
63/*
64* 本地线程唯一id
65*/
66thread_local! {
67    static PI_ASYNC_THREAD_LOCAL_ID: UnsafeCell<usize> = UnsafeCell::new(usize::MAX);
68}
69
70/*
71* 默认的最高优先级边界
72*/
73const DEFAULT_MAX_HIGH_PRIORITY_BOUNDED: usize = 10;
74
75/*
76* 默认的高优先级边界
77*/
78const DEFAULT_HIGH_PRIORITY_BOUNDED: usize = 5;
79
80/*
81* 默认的最低优先级
82*/
83const DEFAULT_MAX_LOW_PRIORITY_BOUNDED: usize = 0;
84
85/*
86* 异步运行时唯一id生成器
87*/
88static RUNTIME_UID_GEN: AtomicUsize = AtomicUsize::new(1);
89
90/*
91* 全局时间状态
92*/
93static GLOBAL_TIME_LOOP_STATUS: AtomicBool = AtomicBool::new(false);
94
95///
96/// 启动全局时间循环,成功则返回句柄,释放句柄将关闭全局时间循环,失败表示已启动,则返回空
97/// 更新间隔时长为毫秒
98///
99pub fn startup_global_time_loop(interval: u64) -> Option<GlobalTimeLoopHandle> {
100    if let Err(_) = GLOBAL_TIME_LOOP_STATUS.compare_exchange(false,
101                                                             true,
102                                                             Ordering::AcqRel,
103                                                             Ordering::Relaxed) {
104        //已启动
105        None
106    } else {
107        //未启动
108        let timer = Upkeep::new_with_clock(Duration::from_millis(interval), Clock::new());
109        let handle = timer.start().unwrap();
110        let clock = Clock::new();
111        let _now = clock.recent();
112
113        Some(GlobalTimeLoopHandle(handle))
114    }
115}
116
117///
118/// 全局时间循环句柄
119///
120pub struct GlobalTimeLoopHandle(Handle);
121
122impl Drop for GlobalTimeLoopHandle {
123    fn drop(&mut self) {
124        GLOBAL_TIME_LOOP_STATUS.store(false, Ordering::Release);
125    }
126}
127
128///
129/// 分配异步运行时唯一id
130///
131pub fn alloc_rt_uid() -> usize {
132    RUNTIME_UID_GEN.fetch_add(1, Ordering::Relaxed)
133}
134
135///
136/// 异步任务唯一id
137///
138pub struct TaskId(UnsafeCell<u128>);
139
140impl Debug for TaskId {
141    fn fmt(&self, f: &mut Formatter) -> FmtResult {
142        write!(f, "TaskId[inner = {}]", unsafe { *self.0.get() })
143    }
144}
145
146impl Clone for TaskId {
147    fn clone(&self) -> Self {
148        unsafe {
149            TaskId(UnsafeCell::new(*self.0.get()))
150        }
151    }
152}
153
154impl TaskId {
155    /// 线程安全的判断异步任务唯一id对应的异步任务的唤醒器是否存在
156    #[inline]
157    pub fn exist_waker<R: 'static>(&self) -> bool {
158        unsafe {
159            let handle = unsafe { TaskHandle::<R>::from_raw((*self.0.get() >> 64) as *const ()) };
160            let inner = &*handle.0;
161            let r = if let Some(waker) = inner.0.swap(None) {
162                inner.0.swap(Some(waker));
163                true
164            } else {
165                false
166            };
167
168            //避免提前释放
169            handle.into_raw();
170
171            r
172        }
173    }
174
175    /// 线程安全的唤醒异步任务唯一id对应的异步任务
176    #[inline]
177    pub fn wakeup<R: 'static>(&self) {
178        unsafe {
179            let handle = unsafe { TaskHandle::<R>::from_raw((*self.0.get() >> 64) as *const ()) };
180            let inner = &*handle.0;
181            if let Some(waker) = inner.0.swap(None) {
182                //当前异步任务的唤醒器存在,则唤醒
183                waker.wake();
184            }
185
186            //避免提前释放
187            handle.into_raw();
188        }
189    }
190
191    /// 线程安全的为异步任务唯一id对应的异步任务设置唤醒器
192    #[inline]
193    pub fn set_waker<R: 'static>(&self, waker: Waker) -> Option<Waker> {
194        unsafe {
195            let handle = unsafe { TaskHandle::<R>::from_raw((*self.0.get() >> 64) as *const ()) };
196            let inner = &*handle.0;
197            let r = inner.0.swap(Some(waker));
198
199            //避免提前释放
200            handle.into_raw();
201
202            r
203        }
204    }
205
206    /// 线程安全的获取异步任务唯一id对应的异步任务的返回值
207    #[inline]
208    pub fn result<R: 'static>(&self) -> Option<R> {
209        unsafe {
210            let handle = unsafe { TaskHandle::<R>::from_raw((*self.0.get() >> 64) as *const ()) };
211            let inner = &*handle.0;
212            let r = inner.1.swap(None);
213
214            //避免提前释放
215            handle.into_raw();
216
217            r
218        }
219    }
220
221    /// 线程安全的为异步任务唯一id对应的异步任务设置返回值
222    #[inline]
223    pub fn set_result<R: 'static>(&self, result: R) -> Option<R> {
224        unsafe {
225            let handle = unsafe { TaskHandle::<R>::from_raw((*self.0.get() >> 64) as *const ()) };
226            let inner = &*handle.0;
227            let r = inner.1.swap(Some(result));
228
229            //避免提前释放
230            handle.into_raw();
231
232            r
233        }
234    }
235}
236
237// 异步任务句柄
238pub(crate) struct TaskHandle<R: 'static>(Box<(
239    AtomicCell<Option<Waker>>,  //任务唤醒器
240    AtomicCell<Option<R>>,      //任务返回值
241)>);
242
243impl<R: 'static> Default for TaskHandle<R> {
244    fn default() -> Self {
245        TaskHandle(Box::new((AtomicCell::new(None), AtomicCell::new(None))))
246    }
247}
248
249impl<R: 'static> TaskHandle<R> {
250    /// 将祼指针转换为异步任务句柄
251    pub unsafe fn from_raw(raw: *const ()) -> TaskHandle<R> {
252        let inner
253            = Box::from_raw(raw as *const (AtomicCell<Option<Waker>>, AtomicCell<Option<R>>) as *mut (AtomicCell<Option<Waker>>, AtomicCell<Option<R>>));
254        TaskHandle(inner)
255    }
256
257    /// 将异步任务句柄转换为祼指针
258    pub fn into_raw(self) -> *const () {
259        Box::into_raw(self.0)
260            as *mut (AtomicCell<Option<Waker>>, AtomicCell<Option<R>>)
261            as *const (AtomicCell<Option<Waker>>, AtomicCell<Option<R>>)
262            as *const ()
263    }
264}
265
266///
267/// 异步任务
268///
269pub struct AsyncTask<
270    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
271    O: Default + 'static = (),
272> {
273    uid:        TaskId,                                 //任务唯一id
274    future:     Mutex<Option<BoxFuture<'static, O>>>,   //异步任务
275    pool:       Arc<P>,                                 //异步任务池
276    priority:   usize,                                  //异步任务优先级
277    context:    Option<UnsafeCell<Box<dyn Any>>>,       //异步任务上下文
278}
279
280impl<
281    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
282    O: Default + 'static,
283> Drop for AsyncTask<P, O> {
284    fn drop(&mut self) {
285        let _ = unsafe { TaskHandle::<O>::from_raw((*self.uid.0.get() >> 64) as usize as *const ()) };
286    }
287}
288
289unsafe impl<
290    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
291    O: Default + 'static,
292> Send for AsyncTask<P, O> {}
293unsafe impl<
294    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
295    O: Default + 'static,
296> Sync for AsyncTask<P, O> {}
297
298impl<
299    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
300    O: Default + 'static,
301> ArcWake for AsyncTask<P, O> {
302    #[cfg(not(target_arch = "aarch64"))]
303    fn wake_by_ref(arc_self: &Arc<Self>) {
304        let pool = arc_self.get_pool();
305        let _ = pool.push_keep(arc_self.clone());
306
307        if let Some(waits) = pool.get_waits() {
308            //当前任务属于多线程异步运行时
309            if let Some(worker_waker) = waits.pop() {
310                //有待唤醒的工作者
311                let (is_sleep, lock, condvar) = &*worker_waker;
312                let _locked = lock.lock();
313                if is_sleep.load(Ordering::Relaxed) {
314                    //待唤醒的工作者,正在休眠,则立即唤醒此工作者
315                    if let Ok(true) = is_sleep
316                        .compare_exchange_weak(true,
317                                               false,
318                                               Ordering::SeqCst,
319                                               Ordering::SeqCst) {
320                        //确认需要唤醒,则唤醒
321                        condvar.notify_one();
322                    }
323                }
324            }
325        } else {
326            //当前线程属于单线程异步运行时
327            if let Some(thread_waker) = pool.get_thread_waker() {
328                //当前任务池绑定了所在线程的唤醒器,则快速检查是否需要唤醒所在线程
329                if thread_waker.0.load(Ordering::Relaxed) {
330                    let (is_sleep, lock, condvar) = &**thread_waker;
331                    let _locked = lock.lock();
332                    //待唤醒的线程,正在休眠,则立即唤醒此线程
333                    if let Ok(true) = is_sleep
334                        .compare_exchange_weak(true,
335                                               false,
336                                               Ordering::SeqCst,
337                                               Ordering::SeqCst) {
338                        //确认需要唤醒,则唤醒
339                        condvar.notify_one();
340                    }
341                }
342            }
343        }
344    }
345    #[cfg(target_arch = "aarch64")]
346    fn wake_by_ref(arc_self: &Arc<Self>) {
347        let pool = arc_self.get_pool();
348        let _ = pool.push_keep(arc_self.clone());
349
350        if let Some(waits) = pool.get_waits() {
351            //当前任务属于多线程异步运行时
352            if let Some(worker_waker) = waits.pop() {
353                //有待唤醒的工作者
354                let (is_sleep, lock, condvar) = &*worker_waker;
355                let locked = lock.lock();
356                if is_sleep.load(Ordering::Relaxed) {
357                    //待唤醒的工作者,正在休眠,则立即唤醒此工作者
358                    if let Ok(true) = is_sleep
359                        .compare_exchange(true,
360                                          false,
361                                          Ordering::SeqCst,
362                                          Ordering::SeqCst) {
363                        //确认需要唤醒,则唤醒
364                        condvar.notify_one();
365                    }
366                }
367            }
368        } else {
369            //当前线程属于单线程异步运行时
370            if let Some(thread_waker) = pool.get_thread_waker() {
371                //当前任务池绑定了所在线程的唤醒器,则快速检查是否需要唤醒所在线程
372                if thread_waker.0.load(Ordering::Relaxed) {
373                    let (is_sleep, lock, condvar) = &**thread_waker;
374                    let locked = lock.lock();
375                    //待唤醒的线程,正在休眠,则立即唤醒此线程
376                    if let Ok(true) = is_sleep
377                        .compare_exchange(true,
378                                          false,
379                                          Ordering::SeqCst,
380                                          Ordering::SeqCst) {
381                        //确认需要唤醒,则唤醒
382                        condvar.notify_one();
383                    }
384                }
385            }
386        }
387    }
388}
389
390impl<
391    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
392    O: Default + 'static,
393> AsyncTask<P, O> {
394    /// 构建单线程任务
395    pub fn new(uid: TaskId,
396               pool: Arc<P>,
397               priority: usize,
398               future: Option<BoxFuture<'static, O>>) -> AsyncTask<P, O> {
399        AsyncTask {
400            uid,
401            future: Mutex::new(future),
402            pool,
403            priority,
404            context: None,
405        }
406    }
407
408    /// 使用指定上下文构建单线程任务
409    pub fn with_context<C: 'static>(uid: TaskId,
410                                    pool: Arc<P>,
411                                    priority: usize,
412                                    future: Option<BoxFuture<'static, O>>,
413                                    context: C) -> AsyncTask<P, O> {
414        let any = Box::new(context);
415
416        AsyncTask {
417            uid,
418            future: Mutex::new(future),
419            pool,
420            priority,
421            context: Some(UnsafeCell::new(any)),
422        }
423    }
424
425    /// 使用指定异步运行时和上下文构建单线程任务
426    pub fn with_runtime_and_context<RT, C>(runtime: &RT,
427                                           priority: usize,
428                                           future: Option<BoxFuture<'static, O>>,
429                                           context: C) -> AsyncTask<P, O>
430        where RT: AsyncRuntime<O, Pool = P>,
431              C: Send + 'static {
432        let any = Box::new(context);
433
434        AsyncTask {
435            uid: runtime.alloc::<O>(),
436            future: Mutex::new(future),
437            pool: runtime.shared_pool(),
438            priority,
439            context: Some(UnsafeCell::new(any)),
440        }
441    }
442
443    /// 检查是否允许唤醒
444    pub fn is_enable_wakeup(&self) -> bool {
445        self.uid.exist_waker::<O>()
446    }
447
448    /// 获取内部任务
449    pub fn get_inner(&self) -> Option<BoxFuture<'static, O>> {
450        self.future.lock().take()
451    }
452
453    /// 设置内部任务
454    pub fn set_inner(&self, inner: Option<BoxFuture<'static, O>>) {
455        *self.future.lock() = inner;
456    }
457
458    /// 获取任务的所有者
459    #[inline]
460    pub fn owner(&self) -> usize {
461        unsafe {
462            *self.uid.0.get() as usize
463        }
464    }
465
466    /// 获取异步任务优先级
467    #[inline]
468    pub fn priority(&self) -> usize {
469        self.priority
470    }
471
472    //判断异步任务是否有上下文
473    pub fn exist_context(&self) -> bool {
474        self.context.is_some()
475    }
476
477    //获取异步任务上下文的只读引用
478    pub fn get_context<C: Send + 'static>(&self) -> Option<&C> {
479        if let Some(context) = &self.context {
480            //存在上下文
481            let any = unsafe { &*context.get() };
482            return <dyn Any>::downcast_ref::<C>(&**any);
483        }
484
485        None
486    }
487
488    //获取异步任务上下文的可写引用
489    pub fn get_context_mut<C: Send + 'static>(&self) -> Option<&mut C> {
490        if let Some(context) = &self.context {
491            //存在上下文
492            let any = unsafe { &mut *context.get() };
493            return <dyn Any>::downcast_mut::<C>(&mut **any);
494        }
495
496        None
497    }
498
499    //设置异步任务上下文,返回上一个异步任务上下文
500    pub fn set_context<C: Send + 'static>(&self, new: C) {
501        if let Some(context) = &self.context {
502            //存在上一个上下文,则释放上一个上下文
503            let _ = unsafe { &*context.get() };
504
505            //设置新的上下文
506            let any: Box<dyn Any + 'static> = Box::new(new);
507            unsafe { *context.get() = any; }
508        }
509    }
510
511    //获取异步任务的任务池
512    pub fn get_pool(&self) -> &P {
513        self.pool.as_ref()
514    }
515}
516
517///
518/// 异步任务池
519///
520pub trait AsyncTaskPool<O: Default + 'static = ()>: Default + Send + Sync + 'static {
521    type Pool: AsyncTaskPoolExt<O> + AsyncTaskPool<O>;
522
523    /// 获取绑定的线程唯一id
524    fn get_thread_id(&self) -> usize;
525
526    /// 获取当前异步任务池内任务数量
527    fn len(&self) -> usize;
528
529    /// 将异步任务加入异步任务池
530    fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
531
532    /// 将异步任务加入本地异步任务池
533    fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
534
535    /// 将指定了优先级的异步任务加入任务池
536    fn push_priority(&self,
537                     priority: usize,
538                     task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
539
540    /// 异步任务被唤醒时,将异步任务继续加入异步任务池
541    fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
542
543    /// 尝试从异步任务池中弹出一个异步任务
544    fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>>;
545
546    /// 尝试从异步任务池中弹出所有异步任务
547    fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>>;
548
549    /// 获取本地线程的唤醒器
550    fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>>;
551}
552
553///
554/// 异步任务池扩展
555///
556pub trait AsyncTaskPoolExt<O: Default + 'static = ()>: Send + Sync + 'static {
557    /// 设置待唤醒的工作者唤醒器队列
558    fn set_waits(&mut self,
559                 _waits: Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>) {}
560
561    /// 获取待唤醒的工作者唤醒器队列
562    fn get_waits(&self) -> Option<&Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>> {
563        //默认没有待唤醒的工作者唤醒器队列
564        None
565    }
566
567    /// 获取空闲的工作者的数量,这个数量大于0,表示可以新开线程来运行可分派的工作者
568    fn idler_len(&self) -> usize {
569        //默认不分派
570        0
571    }
572
573    /// 分派一个空闲的工作者
574    fn spawn_worker(&self) -> Option<usize> {
575        //默认不分派
576        None
577    }
578
579    /// 获取工作者的数量
580    fn worker_len(&self) -> usize {
581        //默认工作者数量和本机逻辑核数相同
582        #[cfg(not(target_arch = "wasm32"))]
583        return num_cpus::get();
584        #[cfg(target_arch = "wasm32")]
585        return 1;
586    }
587
588    /// 获取缓冲区的任务数量,缓冲区任务是未分配给工作者的任务
589    fn buffer_len(&self) -> usize {
590        //默认没有缓冲区
591        0
592    }
593
594    /// 设置当前绑定本地线程的唤醒器
595    fn set_thread_waker(&mut self, _thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) {
596        //默认不设置
597    }
598
599    /// 复制当前绑定本地线程的唤醒器
600    fn clone_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
601        //默认不复制
602        None
603    }
604
605    /// 关闭当前工作者
606    fn close_worker(&self) {
607        //默认不允许关闭工作者
608    }
609}
610
611///
612/// 异步运行时
613///
614pub trait AsyncRuntime<O: Default + 'static = ()>: Clone + Send + Sync + 'static {
615    type Pool: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = Self::Pool>;
616
617    /// 共享运行时内部任务池
618    fn shared_pool(&self) -> Arc<Self::Pool>;
619
620    /// 获取当前异步运行时的唯一id
621    fn get_id(&self) -> usize;
622
623    /// 获取当前异步运行时待处理任务数量
624    fn wait_len(&self) -> usize;
625
626    /// 获取当前异步运行时任务数量
627    fn len(&self) -> usize;
628
629    /// 分配异步任务的唯一id
630    fn alloc<R: 'static>(&self) -> TaskId;
631
632    /// 派发一个指定的异步任务到异步运行时
633    fn spawn<F>(&self, future: F) -> Result<TaskId>
634        where F: Future<Output = O> + Send + 'static;
635
636    /// 派发一个异步任务到本地异步运行时,如果本地没有本异步运行时,则会派发到当前运行时中
637    fn spawn_local<F>(&self, future: F) -> Result<TaskId>
638        where F: Future<Output = O> + Send + 'static;
639
640    /// 派发一个指定优先级的异步任务到异步运行时
641    fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
642        where F: Future<Output = O> + Send + 'static;
643
644    /// 派发一个异步任务到异步运行时,并立即让出任务的当前运行
645    fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
646        where F: Future<Output = O> + Send + 'static;
647
648    /// 派发一个在指定时间后执行的异步任务到异步运行时,时间单位ms
649    fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
650        where F: Future<Output = O> + Send + 'static;
651
652    /// 派发一个指定任务唯一id的异步任务到异步运行时
653    fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
654        where F: Future<Output = O> + Send + 'static;
655
656    /// 派发一个指定任务唯一id的异步任务到本地异步运行时,如果本地没有本异步运行时,则会派发到当前运行时中
657    fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
658        where F: Future<Output = O> + Send + 'static;
659
660    /// 派发一个指定任务唯一id和任务优先级的异步任务到异步运行时
661    fn spawn_priority_by_id<F>(&self,
662                               task_id: TaskId,
663                               priority: usize,
664                               future: F) -> Result<()>
665        where F: Future<Output = O> + Send + 'static;
666
667    /// 派发一个指定任务唯一id的异步任务到异步运行时,并立即让出任务的当前运行
668    fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
669        where F: Future<Output = O> + Send + 'static;
670
671    /// 派发一个指定任务唯一id和在指定时间后执行的异步任务到异步运行时,时间单位ms
672    fn spawn_timing_by_id<F>(&self,
673                             task_id: TaskId,
674                             future: F,
675                             time: usize) -> Result<()>
676        where F: Future<Output = O> + Send + 'static;
677
678    /// 挂起指定唯一id的异步任务
679    fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output>;
680
681    /// 唤醒指定唯一id的异步任务
682    fn wakeup<Output: 'static>(&self, task_id: &TaskId);
683
684    /// 挂起当前异步运行时的当前任务,并在指定的其它运行时上派发一个指定的异步任务,等待其它运行时上的异步任务完成后,唤醒当前运行时的当前任务,并返回其它运行时上的异步任务的值
685    fn wait<V: Send + 'static>(&self) -> AsyncWait<V>;
686
687    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,其中任意一个任务完成,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成的任务的值将被忽略
688    fn wait_any<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAny<V>;
689
690    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,任务返回后需要通过用户指定的检查回调进行检查,其中任意一个任务检查通过,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成或未检查通过的任务的值将被忽略,如果所有任务都未检查通过,则强制唤醒当前运行时的当前任务
691    fn wait_any_callback<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V>;
692
693    /// 构建用于派发多个异步任务到指定运行时的映射归并,需要指定映射归并的容量
694    fn map_reduce<V: Send + 'static>(&self, capacity: usize) -> AsyncMapReduce<V>;
695
696    /// 挂起当前异步运行时的当前任务,等待指定的时间后唤醒当前任务
697    fn timeout(&self, timeout: usize) -> BoxFuture<'static, ()>;
698
699    /// 立即让出当前任务的执行
700    fn yield_now(&self) -> BoxFuture<'static, ()>;
701
702    /// 生成一个异步管道,输入指定流,输入流的每个值通过过滤器生成输出流的值
703    fn pipeline<S, SO, F, FO>(&self, input: S, filter: F) -> BoxStream<'static, FO>
704        where S: Stream<Item = SO> + Send + 'static,
705              SO: Send + 'static,
706              F: FnMut(SO) -> AsyncPipelineResult<FO> + Send + 'static,
707              FO: Send + 'static;
708
709    /// 关闭异步运行时,返回请求关闭是否成功
710    fn close(&self) -> bool;
711}
712
713///
714/// 异步运行时扩展
715///
716pub trait AsyncRuntimeExt<O: Default + 'static = ()> {
717    /// 派发一个指定的异步任务到异步运行时,并指定异步任务的初始化上下文
718    fn spawn_with_context<F, C>(&self,
719                                task_id: TaskId,
720                                future: F,
721                                context: C) -> Result<()>
722        where F: Future<Output = O> + Send + 'static,
723              C: 'static;
724
725    /// 派发一个在指定时间后执行的异步任务到异步运行时,并指定异步任务的初始化上下文,时间单位ms
726    fn spawn_timing_with_context<F, C>(&self,
727                                       task_id: TaskId,
728                                       future: F,
729                                       context: C,
730                                       time: usize) -> Result<()>
731        where F: Future<Output = O> + Send + 'static,
732              C: Send + 'static;
733
734    /// 立即创建一个指定任务池的异步运行时,并执行指定的异步任务,阻塞当前线程,等待异步任务完成后返回
735    fn block_on<F>(&self, future: F) -> Result<F::Output>
736        where F: Future + Send + 'static,
737              <F as Future>::Output: Default + Send + 'static;
738}
739
740///
741/// 异步运行时构建器
742///
743pub struct AsyncRuntimeBuilder<O: Default + 'static = ()>(PhantomData<O>);
744
745impl<O: Default + 'static> AsyncRuntimeBuilder<O> {
746    /// 构建默认的工作者异步运行时
747    pub fn default_worker_thread(worker_name: Option<&str>,
748                                 worker_stack_size: Option<usize>,
749                                 worker_sleep_timeout: Option<u64>,
750                                 worker_loop_interval: Option<Option<u64>>) -> WorkerRuntime<O> {
751        let runner = WorkerTaskRunner::default();
752
753        let thread_name = if let Some(name) = worker_name {
754            name
755        } else {
756            //默认的线程名称
757            "Default-Single-Worker"
758        };
759        let thread_stack_size = if let Some(size) = worker_stack_size {
760            size
761        } else {
762            //默认的线程堆栈大小
763            2 * 1024 * 1024
764        };
765        let sleep_timeout = if let Some(timeout) = worker_sleep_timeout {
766            timeout
767        } else {
768            //默认的线程休眠时长
769            1
770        };
771        let loop_interval = if let Some(interval) = worker_loop_interval {
772            interval
773        } else {
774            //默认的线程循环间隔时长
775            None
776        };
777
778        //创建线程并在线程中执行异步运行时
779        let clock = Clock::new();
780        let runner_copy = runner.clone();
781        let rt_copy = runner.get_runtime();
782        let rt = runner.startup(
783            thread_name,
784            thread_stack_size,
785            sleep_timeout,
786            loop_interval,
787            move || {
788                let last = clock.recent();
789                match runner_copy.run_once() {
790                    Err(e) => {
791                        panic!("Run runner failed, reason: {:?}", e);
792                    },
793                    Ok(len) => {
794                        (len == 0,
795                         clock
796                             .recent()
797                             .duration_since(last))
798                    },
799                }
800            },
801            move || {
802                rt_copy.wait_len() + rt_copy.len()
803            },
804        );
805
806        rt
807    }
808
809    /// 构建自定义的工作者异步运行时
810    pub fn custom_worker_thread<P, F0, F1>(pool: P,
811                                           worker_handle: Arc<AtomicBool>,
812                                           worker_condvar: Arc<(AtomicBool, Mutex<()>, Condvar)>,
813                                           thread_name: &str,
814                                           thread_stack_size: usize,
815                                           sleep_timeout: u64,
816                                           loop_interval: Option<u64>,
817                                           loop_func: F0,
818                                           get_queue_len: F1) -> WorkerRuntime<O, P>
819        where P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
820              F0: Fn() -> (bool, Duration) + Send + 'static,
821              F1: Fn() -> usize + Send + 'static {
822        let runner = WorkerTaskRunner::new(pool,
823                                           worker_handle,
824                                           worker_condvar);
825
826        //创建线程并在线程中执行异步运行时
827        let rt_copy = runner.get_runtime();
828        let rt = runner.startup(
829            thread_name,
830            thread_stack_size,
831            sleep_timeout,
832            loop_interval,
833            loop_func,
834            move || {
835                rt_copy.wait_len() + get_queue_len()
836            },
837        );
838
839        rt
840    }
841
842    /// 构建默认的多线程异步运行时
843    pub fn default_multi_thread(worker_prefix: Option<&str>,
844                                worker_stack_size: Option<usize>,
845                                worker_size: Option<usize>,
846                                worker_sleep_timeout: Option<u64>) -> MultiTaskRuntime<O> {
847        let mut builder = MultiTaskRuntimeBuilder::default();
848
849        if let Some(thread_prefix) = worker_prefix {
850            builder = builder.thread_prefix(thread_prefix);
851        }
852        if let Some(thread_stack_size) = worker_stack_size {
853            builder = builder.thread_stack_size(thread_stack_size);
854        }
855        if let Some(size) = worker_size {
856            builder = builder
857                .init_worker_size(size)
858                .set_worker_limit(size, size);
859        }
860        if let Some(sleep_timeout) = worker_sleep_timeout {
861            builder = builder.set_timeout(sleep_timeout);
862        }
863
864        builder.build()
865    }
866
867    /// 构建自定义的多线程异步运行时
868    pub fn custom_multi_thread<P>(pool: P,
869                                  worker_prefix: &str,
870                                  worker_stack_size: usize,
871                                  worker_size: usize,
872                                  worker_sleep_timeout: u64,
873                                  worker_timer_interval: usize) -> MultiTaskRuntime<O, P>
874        where P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P> {
875        MultiTaskRuntimeBuilder::new(pool)
876            .thread_prefix(worker_prefix)
877            .thread_stack_size(worker_stack_size)
878            .init_worker_size(worker_size)
879            .set_worker_limit(worker_size, worker_size)
880            .set_timeout(worker_sleep_timeout)
881            .set_timer_interval(worker_timer_interval)
882            .build()
883    }
884}
885
886/// 绑定指定异步运行时到本地线程
887pub fn bind_local_thread<O: Default + 'static>(runtime: LocalAsyncRuntime<O>) {
888    match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
889        let raw = Arc::into_raw(Arc::new(runtime)) as *mut LocalAsyncRuntime<O> as *mut ();
890        rt.store(raw, Ordering::Relaxed);
891    }) {
892        Err(e) => {
893            panic!("Bind single runtime to local thread failed, reason: {:?}", e);
894        },
895        Ok(_) => (),
896    }
897}
898
899/// 从本地线程解绑单线程异步任务执行器
900pub fn unbind_local_thread() {
901    let _ = PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
902        rt.store(null_mut(), Ordering::Relaxed);
903    });
904}
905
906///
907/// 本地线程绑定的异步运行时
908///
909pub struct LocalAsyncRuntime<O: Default + 'static> {
910    inner:              *const (),                                                  //内部运行时指针
911    get_id_func:        fn(*const ()) -> usize,                                     //获取本地运行时的id的函数
912    spawn_func:         fn(*const (), BoxFuture<'static, O>) -> Result<()>,         //派发函数
913    spawn_timing_func:  fn(*const (), BoxFuture<'static, O>, usize) -> Result<()>,  //定时派发函数
914    timeout_func:       fn(*const (), usize) -> BoxFuture<'static, ()>,             //超时函数
915}
916
917unsafe impl<O: Default + 'static> Send for LocalAsyncRuntime<O> {}
918unsafe impl<O: Default + 'static> Sync for LocalAsyncRuntime<O> {}
919
920impl<O: Default + 'static> LocalAsyncRuntime<O> {
921    /// 创建本地线程绑定的异步运行时
922    pub fn new(inner: *const (),
923               get_id_func: fn(*const ()) -> usize,
924               spawn_func: fn(*const (), BoxFuture<'static, O>) -> Result<()>,
925               spawn_timing_func: fn(*const (), BoxFuture<'static, O>, usize) -> Result<()>,
926               timeout_func: fn(*const (), usize) -> BoxFuture<'static, ()>) -> Self {
927        LocalAsyncRuntime {
928            inner,
929            get_id_func,
930            spawn_func,
931            spawn_timing_func,
932            timeout_func,
933        }
934    }
935
936    /// 获取本地运行时的id
937    #[inline]
938    pub fn get_id(&self) -> usize {
939        (self.get_id_func)(self.inner)
940    }
941
942    /// 派发一个指定的异步任务到本地线程绑定的异步运行时
943    #[inline]
944    pub fn spawn<F>(&self, future: F) -> Result<()>
945        where F: Future<Output = O> + Send + 'static {
946        (self.spawn_func)(self.inner, async move {
947            future.await
948        }.boxed())
949    }
950
951    /// 定时派发一个指定的异步任务到本地线程绑定的异步运行时
952    #[inline]
953    pub fn sapwn_timing_func<F>(&self, future: F, timeout: usize) -> Result<()>
954        where F: Future<Output = O> + Send + 'static {
955        (self.spawn_timing_func)(self.inner,
956                                 async move {
957                                     future.await
958                                 }.boxed(),
959                                 timeout)
960    }
961
962    /// 挂起本地线程绑定的异步运行时的当前任务,等待指定的时间后唤醒当前任务
963    #[inline]
964    pub fn timeout(&self, timeout: usize) -> BoxFuture<'static, ()> {
965        (self.timeout_func)(self.inner, timeout)
966    }
967}
968
969///
970/// 获取本地线程绑定的异步运行时
971/// 注意:O如果与本地线程绑定的运行时的O不相同,则无法获取本地线程绑定的运行时
972///
973pub fn local_async_runtime<O: Default + 'static>() -> Option<Arc<LocalAsyncRuntime<O>>> {
974    match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |ptr| {
975        let raw = ptr.load(Ordering::Relaxed) as *const LocalAsyncRuntime<O>;
976        unsafe {
977            if raw.is_null() {
978                //本地线程未绑定异步运行时
979                None
980            } else {
981                //本地线程已绑定异步运行时
982                let shared: Arc<LocalAsyncRuntime<O>> = unsafe { Arc::from_raw(raw) };
983                let result = shared.clone();
984                Arc::into_raw(shared); //避免提前释放
985                Some(result)
986            }
987        }
988    }) {
989        Err(_) => None, //本地线程没有绑定异步运行时
990        Ok(rt) => rt,
991    }
992}
993
994///
995/// 派发任务到本地线程绑定的异步运行时,如果本地线程没有异步运行时,则返回错误
996/// 注意:F::Output如果与本地线程绑定的运行时的O不相同,则无法执行指定任务
997///
998pub fn spawn_local<O, F>(future: F) -> Result<()>
999    where O: Default + 'static,
1000          F: Future<Output = O> + Send + 'static {
1001    if let Some(rt) = local_async_runtime::<O>() {
1002        rt.spawn(future)
1003    } else {
1004        Err(Error::new(ErrorKind::Other, format!("Spawn task to local thread failed, reason: runtime not exist")))
1005    }
1006}
1007
1008///
1009/// 从本地线程绑定的字典中获取指定类型的值的只读引用
1010///
1011pub fn get_local_dict<T: 'static>() -> Option<&'static T> {
1012    match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT.try_with(move |dict| {
1013        unsafe {
1014            if let Some(any) = (&*dict.get()).get(&TypeId::of::<T>()) {
1015                //指定类型的值存在
1016                <dyn Any>::downcast_ref::<T>(&**any)
1017            } else {
1018                //指定类型的值不存在
1019                None
1020            }
1021        }
1022    }) {
1023        Err(_) => {
1024            None
1025        },
1026        Ok(result) => {
1027            result
1028        }
1029    }
1030}
1031
1032///
1033/// 从本地线程绑定的字典中获取指定类型的值的可写引用
1034///
1035pub fn get_local_dict_mut<T: 'static>() -> Option<&'static mut T> {
1036    match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT.try_with(move |dict| {
1037        unsafe {
1038            if let Some(any) = (&mut *dict.get()).get_mut(&TypeId::of::<T>()) {
1039                //指定类型的值存在
1040                <dyn Any>::downcast_mut::<T>(&mut **any)
1041            } else {
1042                //指定类型的值不存在
1043                None
1044            }
1045        }
1046    }) {
1047        Err(_) => {
1048            None
1049        },
1050        Ok(result) => {
1051            result
1052        }
1053    }
1054}
1055
1056///
1057/// 在本地线程绑定的字典中设置指定类型的值,返回上一个设置的值
1058///
1059pub fn set_local_dict<T: 'static>(value: T) -> Option<T> {
1060    match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT.try_with(move |dict| {
1061        unsafe {
1062            let result = if let Some(any) = (&mut *dict.get()).remove(&TypeId::of::<T>()) {
1063                //指定类型的上一个值存在
1064                if let Ok(r) = any.downcast() {
1065                    //造型成功,则返回
1066                    Some(*r)
1067                } else {
1068                    None
1069                }
1070            } else {
1071                //指定类型的上一个值不存在
1072                None
1073            };
1074
1075            //设置指定类型的新值
1076            (&mut *dict.get()).insert(TypeId::of::<T>(), Box::new(value) as Box<dyn Any>);
1077
1078            result
1079        }
1080    }) {
1081        Err(_) => {
1082            None
1083        },
1084        Ok(result) => {
1085            result
1086        }
1087    }
1088}
1089
1090///
1091/// 在本地线程绑定的字典中移除指定类型的值,并返回移除的值
1092///
1093pub fn remove_local_dict<T: 'static>() -> Option<T> {
1094    match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT.try_with(move |dict| {
1095        unsafe {
1096            if let Some(any) = (&mut *dict.get()).remove(&TypeId::of::<T>()) {
1097                //指定类型的上一个值存在
1098                if let Ok(r) = any.downcast() {
1099                    //造型成功,则返回
1100                    Some(*r)
1101                } else {
1102                    None
1103                }
1104            } else {
1105                //指定类型的上一个值不存在
1106                None
1107            }
1108        }
1109    }) {
1110        Err(_) => {
1111            None
1112        },
1113        Ok(result) => {
1114            result
1115        }
1116    }
1117}
1118
1119///
1120/// 清空本地线程绑定的字典
1121///
1122pub fn clear_local_dict() -> Result<()> {
1123    match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT.try_with(move |dict| {
1124        unsafe {
1125            (&mut *dict.get()).clear();
1126        }
1127    }) {
1128        Err(e) => {
1129            Err(Error::new(ErrorKind::Other, format!("Clear local dict failed, reason: {:?}", e)))
1130        },
1131        Ok(_) => {
1132            Ok(())
1133        }
1134    }
1135}
1136
1137///
1138/// 同步非阻塞的异步值,只允许被同步非阻塞的设置一次值
1139///
1140pub struct AsyncValue<V: Send + 'static>(Arc<InnerAsyncValue<V>>);
1141
1142unsafe impl<V: Send + 'static> Send for AsyncValue<V> {}
1143unsafe impl<V: Send + 'static> Sync for AsyncValue<V> {}
1144
1145impl<V: Send + 'static> Clone for AsyncValue<V> {
1146    fn clone(&self) -> Self {
1147        AsyncValue(self.0.clone())
1148    }
1149}
1150
1151impl<V: Send + 'static> Debug for AsyncValue<V> {
1152    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1153        write!(f,
1154               "AsyncValue[status = {}]",
1155               self.0.status.load(Ordering::Acquire))
1156    }
1157}
1158
1159impl<V: Send + 'static> Future for AsyncValue<V> {
1160    type Output = V;
1161
1162    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1163        let mut spin_len = 1;
1164        while self.0.status.load(Ordering::Acquire) == 2 {
1165            //还未完成设置值,则自旋等待
1166            spin_len = spin(spin_len);
1167        }
1168
1169        if self.0.status.load(Ordering::Acquire) == 3 {
1170            if let Some(value) = unsafe { (*(&self).0.value.get()).take() } {
1171                //异步值已就绪
1172                return Poll::Ready(value);
1173            }
1174        }
1175
1176        unsafe {
1177            *self.0.waker.get() = Some(cx.waker().clone()); //设置异步值的唤醒器
1178        }
1179
1180        let mut spin_len = 1;
1181        loop {
1182            match self.0.status.compare_exchange(0,
1183                                                 1, Ordering::Acquire,
1184                                                 Ordering::Relaxed) {
1185                Err(2) => {
1186                    //异步值准备设置值,则稍后重试
1187                    spin_len = spin(spin_len);
1188                    continue;
1189                },
1190                Err(3) => {
1191                    //异步值已就绪
1192                    let value = unsafe { (*(&self).0.value.get()).take().unwrap() };
1193                    return Poll::Ready(value);
1194                },
1195                Err(_) => {
1196                    unimplemented!();
1197                },
1198                Ok(_) => {
1199                    //异步值等待设置后唤醒
1200                    return Poll::Pending;
1201                },
1202            }
1203        }
1204    }
1205}
1206
1207/*
1208* 同步非阻塞的异步值同步方法
1209*/
1210impl<V: Send + 'static> AsyncValue<V> {
1211    /// 构建异步值,默认值为未就绪
1212    pub fn new() -> Self {
1213        let inner = InnerAsyncValue {
1214            value: UnsafeCell::new(None),
1215            waker: UnsafeCell::new(None),
1216            status: AtomicU8::new(0),
1217        };
1218
1219        AsyncValue(Arc::new(inner))
1220    }
1221
1222    /// 判断异步值是否已完成设置
1223    pub fn is_complete(&self) -> bool {
1224        self
1225            .0
1226            .status
1227            .load(Ordering::Relaxed) == 3
1228    }
1229
1230    /// 设置异步值
1231    pub fn set(self, value: V) {
1232        loop {
1233            match self.0.status.compare_exchange(1,
1234                                                 2,
1235                                                 Ordering::Acquire,
1236                                                 Ordering::Relaxed) {
1237                Err(0) => {
1238                    match self.0.status.compare_exchange(0,
1239                                                         2,
1240                                                         Ordering::Acquire,
1241                                                         Ordering::Relaxed) {
1242                        Err(1) => {
1243                            //异步值的唤醒器已就绪,则继续尝试获取锁
1244                            continue;
1245                        },
1246                        Err(_) => {
1247                            //异步值正在设置或已完成设置,则立即返回
1248                            return;
1249                        },
1250                        Ok(_) => {
1251                            //异步值的唤醒器未就绪且获取到锁,则设置异步值后将状态设置为已完成设置,并立即返回
1252                            unsafe { *self.0.value.get() = Some(value); }
1253                            self.0.status.store(3, Ordering::Release);
1254                            return;
1255                        }
1256                     }
1257                },
1258                Err(_) => {
1259                    //异步值正在设置或已完成设置,则立即返回
1260                    return;
1261                },
1262                Ok(_) => {
1263                    //异步值的唤醒器已就绪且获取到锁,则立即退出自旋
1264                    break;
1265                }
1266            }
1267        }
1268
1269        //已锁且获取到锁,则设置异步值,将状态设置为已完成设置,并立即唤醒异步值
1270        unsafe { *self.0.value.get() = Some(value); }
1271        self.0.status.store(3, Ordering::Release);
1272        let waker = unsafe { (*self.0.waker.get()).take().unwrap() };
1273        waker.wake();
1274    }
1275}
1276
1277// 同步非阻塞的内部异步值,只允许被同步非阻塞的设置一次值
1278pub struct InnerAsyncValue<V: Send + 'static> {
1279    value:  UnsafeCell<Option<V>>,      //值
1280    waker:  UnsafeCell<Option<Waker>>,  //唤醒器
1281    status: AtomicU8,                   //状态
1282}
1283
1284///
1285/// 异步非阻塞可变值的守护者
1286///
1287pub struct AsyncVariableGuard<'a, V: Send + 'static> {
1288    value:  &'a UnsafeCell<Option<V>>,      //值
1289    waker:  &'a UnsafeCell<Option<Waker>>,  //唤醒器
1290    status: &'a AtomicU8,                   //值状态
1291}
1292
1293unsafe impl<V: Send + 'static> Send for AsyncVariableGuard<'_, V> {}
1294
1295impl<V: Send + 'static> Drop for AsyncVariableGuard<'_, V> {
1296    fn drop(&mut self) {
1297        //当前异步可变值已锁定,则解除锁定
1298        //当前异步可变值的状态为2或6,表示当前异步可变值的唤醒器未就绪并已锁定,或当前异步可变值不需要唤醒并已完成所有修改
1299        //当前异步可变值的状态为3或7,表示当前异步可变值的唤醒器已就绪并已锁定,或当前异步可变值已唤醒并已完成所有修改
1300        self.status.fetch_sub(2, Ordering::Relaxed);
1301    }
1302}
1303
1304impl<V: Send + 'static> Deref for AsyncVariableGuard<'_, V> {
1305    type Target = Option<V>;
1306
1307    fn deref(&self) -> &Self::Target {
1308        unsafe {
1309            &*self.value.get()
1310        }
1311    }
1312}
1313
1314impl<V: Send + 'static> DerefMut for AsyncVariableGuard<'_, V> {
1315    fn deref_mut(&mut self) -> &mut Self::Target {
1316        unsafe {
1317            &mut *self.value.get()
1318        }
1319    }
1320}
1321
1322impl<V: Send + 'static> AsyncVariableGuard<'_, V> {
1323    /// 完成异步可变值的修改
1324    pub fn finish(self) {
1325        //设置异步可变值的状态为已完成修改
1326        if self.status.fetch_add(4, Ordering::Relaxed) == 3 {
1327            if let Some(waker) = unsafe { (&mut *self.waker.get()).take() } {
1328                //当前异步可变值需要唤醒,则立即唤醒异步可变值
1329                waker.wake();
1330            }
1331        }
1332    }
1333}
1334
1335///
1336/// 异步非阻塞可变值,在完成前允许被同步非阻塞的修改多次
1337///
1338pub struct AsyncVariable<V: Send + 'static>(Arc<InnerAsyncVariable<V>>);
1339
1340unsafe impl<V: Send + 'static> Send for AsyncVariable<V> {}
1341unsafe impl<V: Send + 'static> Sync for AsyncVariable<V> {}
1342
1343impl<V: Send + 'static> Clone for AsyncVariable<V> {
1344    fn clone(&self) -> Self {
1345        AsyncVariable(self.0.clone())
1346    }
1347}
1348
1349impl<V: Send + 'static> Future for AsyncVariable<V> {
1350    type Output = V;
1351
1352    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1353        unsafe {
1354            *self.0.waker.get() = Some(cx.waker().clone()); //设置异步可变值的唤醒器准备就绪
1355        }
1356
1357        let mut spin_len = 1;
1358        loop {
1359            match self.0.status.compare_exchange(0,
1360                                                 1,
1361                                                 Ordering::Acquire,
1362                                                 Ordering::Relaxed) {
1363                Err(current) if current & 4 != 0 => {
1364                    //异步可变值已完成所有修改,则立即返回
1365                    unsafe {
1366                        let _ = (&mut *self.0.waker.get()).take(); //释放异步可变值的唤醒器
1367                        return Poll::Ready((&mut *(&self).0.value.get()).take().unwrap());
1368                    }
1369                },
1370                Err(_) => {
1371                    //还未完成值修改,则自旋等待
1372                    spin_len = spin(spin_len);
1373                },
1374                Ok(_) => {
1375                    //异步可变值已挂起
1376                    return Poll::Pending;
1377                },
1378            }
1379        }
1380    }
1381}
1382
1383impl<V: Send + 'static> AsyncVariable<V> {
1384    /// 构建异步可变值,默认值为未就绪
1385    pub fn new() -> Self {
1386        let inner = InnerAsyncVariable {
1387            value: UnsafeCell::new(None),
1388            waker: UnsafeCell::new(None),
1389            status: AtomicU8::new(0),
1390        };
1391
1392        AsyncVariable(Arc::new(inner))
1393    }
1394
1395    /// 判断异步可变值是否已完成设置
1396    pub fn is_complete(&self) -> bool {
1397        self
1398            .0
1399            .status
1400            .load(Ordering::Acquire) & 4 != 0
1401    }
1402
1403    /// 锁住待修改的异步可变值,并返回当前异步可变值的守护者,如果异步可变值已完成修改则返回空
1404    pub fn lock(&self) -> Option<AsyncVariableGuard<V>> {
1405        let mut spin_len = 1;
1406        loop {
1407            match self
1408                .0
1409                .status
1410                .compare_exchange(1,
1411                                  3,
1412                                  Ordering::Acquire,
1413                                  Ordering::Relaxed) {
1414                Err(0) => {
1415                    //异步可变值还未就绪,则自旋等待
1416                    match self
1417                        .0
1418                        .status
1419                        .compare_exchange(0,
1420                                          2,
1421                                          Ordering::Acquire,
1422                                          Ordering::Relaxed) {
1423                        Err(1) => {
1424                            //异步可变值已就绪,则继续尝试获取锁
1425                            continue;
1426                        },
1427                        Err(2) => {
1428                            //异步可变值的唤醒器未就绪且已锁,但未获取到锁,则自旋等待
1429                            spin_len = spin(spin_len);
1430                        },
1431                        Err(3) => {
1432                            //异步可变值的唤醒器已就绪且已锁,但未获取到锁,则自旋等待
1433                            spin_len = spin(spin_len);
1434                        },
1435                        Err(_) => {
1436                            //已完成,则返回空
1437                            return None;
1438                        },
1439                        Ok(_) => {
1440                            //异步可变值的唤醒器未就绪且获取到锁,则返回异步可变值的守护者
1441                            let guard = AsyncVariableGuard {
1442                                value: &self.0.value,
1443                                waker: &self.0.waker,
1444                                status: &self.0.status,
1445                            };
1446
1447                            return Some(guard)
1448                        },
1449                    }
1450                },
1451                Err(2) => {
1452                    //异步可变值的唤醒器未就绪且已锁,但未获取到锁,则自旋等待
1453                    spin_len = spin(spin_len);
1454                },
1455                Err(3) => {
1456                    //异步可变值的唤醒器已就绪且已锁,但未获取到锁,则自旋等待
1457                    spin_len = spin(spin_len);
1458                },
1459                Err(_) => {
1460                    //已完成,则返回空
1461                    return None;
1462                }
1463                Ok(_) => {
1464                    //异步可变值的唤醒器已就绪且获取到锁,则返回异步可变值的守护者
1465                    let guard = AsyncVariableGuard {
1466                        value: &self.0.value,
1467                        waker: &self.0.waker,
1468                        status: &self.0.status,
1469                    };
1470
1471                    return Some(guard)
1472                },
1473            }
1474        }
1475    }
1476}
1477
1478// 内部异步非阻塞可变值,在完成前允许被同步非阻塞的修改多次
1479pub struct InnerAsyncVariable<V: Send + 'static> {
1480    value:  UnsafeCell<Option<V>>,      //值
1481    waker:  UnsafeCell<Option<Waker>>,  //唤醒器
1482    status: AtomicU8,                   //状态
1483}
1484
1485///
1486/// 等待异步任务运行的结果
1487///
1488pub struct AsyncWaitResult<V: Send + 'static>(pub Arc<RefCell<Option<Result<V>>>>);
1489
1490unsafe impl<V: Send + 'static> Send for AsyncWaitResult<V> {}
1491unsafe impl<V: Send + 'static> Sync for AsyncWaitResult<V> {}
1492
1493impl<V: Send + 'static> Clone for AsyncWaitResult<V> {
1494    fn clone(&self) -> Self {
1495        AsyncWaitResult(self.0.clone())
1496    }
1497}
1498
1499///
1500/// 等待异步任务运行的结果集
1501///
1502pub struct AsyncWaitResults<V: Send + 'static>(pub Arc<RefCell<Option<Vec<Result<V>>>>>);
1503
1504unsafe impl<V: Send + 'static> Send for AsyncWaitResults<V> {}
1505unsafe impl<V: Send + 'static> Sync for AsyncWaitResults<V> {}
1506
1507impl<V: Send + 'static> Clone for AsyncWaitResults<V> {
1508    fn clone(&self) -> Self {
1509        AsyncWaitResults(self.0.clone())
1510    }
1511}
1512
1513///
1514/// 异步定时器任务
1515///
1516pub enum AsyncTimingTask<
1517    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1518    O: Default + 'static = (),
1519> {
1520    Pended(TaskId),                 //已挂起的定时任务
1521    WaitRun(Arc<AsyncTask<P, O>>),  //等待执行的定时任务
1522}
1523
1524///
1525/// 异步任务本地定时器
1526///
1527pub struct AsyncTaskTimer<
1528    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1529    O: Default + 'static = (),
1530> {
1531    producor:   Sender<(usize, AsyncTimingTask<P, O>)>,                     //定时任务生产者
1532    consumer:   Receiver<(usize, AsyncTimingTask<P, O>)>,                   //定时任务消费者
1533    timer:      Arc<RefCell<Timer<AsyncTimingTask<P, O>, 1000, 60, 3>>>,    //定时器
1534    clock:      Clock,                                                      //定时器时钟
1535    now:        QInstant,                                                   //当前时间
1536}
1537
1538unsafe impl<
1539    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1540    O: Default + 'static,
1541> Send for AsyncTaskTimer<P, O> {}
1542unsafe impl<
1543    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1544    O: Default + 'static,
1545> Sync for AsyncTaskTimer<P, O> {}
1546
1547impl<
1548    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1549    O: Default + 'static,
1550> AsyncTaskTimer<P, O> {
1551    /// 构建异步任务本地定时器
1552    pub fn new() -> Self {
1553        let (producor, consumer) = unbounded();
1554        let clock = Clock::new();
1555        let now = clock.recent();
1556
1557        AsyncTaskTimer {
1558            producor,
1559            consumer,
1560            timer: Arc::new(RefCell::new(Timer::<AsyncTimingTask<P, O>, 1000, 60, 3>::default())),
1561            clock,
1562            now,
1563        }
1564    }
1565
1566    /// 获取定时任务生产者
1567    #[inline]
1568    pub fn get_producor(&self) -> &Sender<(usize, AsyncTimingTask<P, O>)> {
1569        &self.producor
1570    }
1571
1572    /// 获取剩余未到期的定时器任务数量
1573    #[inline]
1574    pub fn len(&self) -> usize {
1575        let timer = self.timer.as_ref().borrow();
1576        timer.add_count() - timer.remove_count()
1577    }
1578
1579    /// 设置定时器
1580    pub fn set_timer(&self, task: AsyncTimingTask<P, O>, timeout: usize) -> usize {
1581        let current_time = self
1582            .clock
1583            .recent()
1584            .duration_since(self.now)
1585            .as_millis() as u64;
1586        self
1587            .timer
1588            .borrow_mut()
1589            .push_time(current_time + timeout as u64, task)
1590            .data()
1591            .as_ffi() as usize
1592    }
1593
1594    /// 取消定时器
1595    pub fn cancel_timer(&self, timer_ref: usize) -> Option<AsyncTimingTask<P, O>> {
1596        if let Some(item) = self
1597            .timer
1598            .borrow_mut()
1599            .cancel(KeyData::from_ffi(timer_ref as u64).into()) {
1600            Some(item)
1601        } else {
1602            None
1603        }
1604    }
1605
1606    /// 消费所有定时任务,返回定时任务数量
1607    pub fn consume(&self) -> usize {
1608        let timer_tasks = self.consumer.try_iter().collect::<Vec<(usize, AsyncTimingTask<P, O>)>>();
1609        let len = timer_tasks.len();
1610        for (timeout, task) in timer_tasks {
1611            self.set_timer(task, timeout);
1612        }
1613
1614        len
1615    }
1616
1617    /// 判断当前时间是否有可以弹出的任务,如果有可以弹出的任务,则返回当前时间,否则返回空
1618    pub fn is_require_pop(&self) -> Option<u64> {
1619        let current_time = self
1620            .clock
1621            .recent()
1622            .duration_since(self.now)
1623            .as_millis() as u64;
1624        if self.timer.borrow_mut().is_ok(current_time) {
1625            Some(current_time)
1626        } else {
1627            None
1628        }
1629    }
1630
1631    /// 从定时器中弹出指定时间的一个到期任务
1632    pub fn pop(&self, current_time: u64) -> Option<(usize, AsyncTimingTask<P, O>)> {
1633        if let Some((key, item)) = self.timer.borrow_mut().pop_kv(current_time) {
1634            Some((key.data().as_ffi() as usize, item))
1635        } else {
1636            None
1637        }
1638    }
1639}
1640
1641///
1642/// 异步任务本地定时器,不支持取消定时任务
1643///
1644pub struct AsyncTaskTimerByNotCancel<
1645    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1646    O: Default + 'static = (),
1647> {
1648    producor:   Sender<(usize, AsyncTimingTask<P, O>)>,                             //定时任务生产者
1649    consumer:   Receiver<(usize, AsyncTimingTask<P, O>)>,                           //定时任务消费者
1650    timer:      Arc<RefCell<NotCancelTimer<AsyncTimingTask<P, O>, 1000, 60, 3>>>,   //定时器
1651    clock:      Clock,                                                              //定时器时钟
1652    now:        QInstant,                                                           //当前时间
1653}
1654
1655unsafe impl<
1656    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1657    O: Default + 'static,
1658> Send for AsyncTaskTimerByNotCancel<P, O> {}
1659unsafe impl<
1660    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1661    O: Default + 'static,
1662> Sync for AsyncTaskTimerByNotCancel<P, O> {}
1663
1664impl<
1665    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1666    O: Default + 'static,
1667> AsyncTaskTimerByNotCancel<P, O> {
1668    /// 构建异步任务本地定时器
1669    pub fn new() -> Self {
1670        let (producor, consumer) = unbounded();
1671        let clock = Clock::new();
1672        let now = clock.recent();
1673
1674        AsyncTaskTimerByNotCancel {
1675            producor,
1676            consumer,
1677            timer: Arc::new(RefCell::new(NotCancelTimer::<AsyncTimingTask<P, O>, 1000, 60, 3>::default())),
1678            clock,
1679            now,
1680        }
1681    }
1682
1683    /// 获取定时任务生产者
1684    #[inline]
1685    pub fn get_producor(&self) -> &Sender<(usize, AsyncTimingTask<P, O>)> {
1686        &self.producor
1687    }
1688
1689    /// 获取剩余未到期的定时器任务数量
1690    #[inline]
1691    pub fn len(&self) -> usize {
1692        let timer = self.timer.as_ref().borrow();
1693        timer.add_count() - timer.remove_count()
1694    }
1695
1696    /// 设置定时器
1697    pub fn set_timer(&self, task: AsyncTimingTask<P, O>, timeout: usize) {
1698        self
1699            .timer
1700            .borrow_mut()
1701            .push(timeout, task);
1702    }
1703
1704    /// 消费所有定时任务,返回定时任务数量
1705    pub fn consume(&self) -> usize {
1706        let timer_tasks = self.consumer.try_iter().collect::<Vec<(usize, AsyncTimingTask<P, O>)>>();
1707        let len = timer_tasks.len();
1708        for (timeout, task) in timer_tasks {
1709            self.set_timer(task, timeout);
1710        }
1711
1712        len
1713    }
1714
1715    /// 判断当前时间是否有可以弹出的任务,如果有可以弹出的任务,则返回当前时间,否则返回空
1716    pub fn is_require_pop(&self) -> Option<u64> {
1717        let current_time = self
1718            .clock
1719            .recent()
1720            .duration_since(self.now)
1721            .as_millis() as u64;
1722        if self.timer.borrow_mut().is_ok(current_time) {
1723            Some(current_time)
1724        } else {
1725            None
1726        }
1727    }
1728
1729    /// 从定时器中弹出指定时间的一个到期任务
1730    pub fn pop(&self, current_time: u64) -> Option<AsyncTimingTask<P, O>> {
1731        if let Some(item) = self.timer.borrow_mut().pop(current_time) {
1732            Some(item)
1733        } else {
1734            None
1735        }
1736    }
1737}
1738
1739///
1740/// 等待指定超时
1741///
1742pub struct AsyncWaitTimeout<
1743    RT: AsyncRuntime<O>,
1744    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1745    O: Default + 'static = (),
1746> {
1747    rt:         RT,                                     //当前运行时
1748    producor:   Sender<(usize, AsyncTimingTask<P, O>)>, //超时请求生产者
1749    timeout:    usize,                                  //超时时长,单位ms
1750    expired:    AtomicBool,                             //是否已过期
1751}
1752
1753unsafe impl<
1754    RT: AsyncRuntime<O>,
1755    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1756    O: Default + 'static,
1757> Send for AsyncWaitTimeout<RT, P, O> {}
1758unsafe impl<
1759    RT: AsyncRuntime<O>,
1760    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1761    O: Default + 'static,
1762> Sync for AsyncWaitTimeout<RT, P, O> {}
1763
1764impl<
1765    RT: AsyncRuntime<O>,
1766    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1767    O: Default + 'static,
1768> Future for AsyncWaitTimeout<RT, P, O> {
1769    type Output = ();
1770
1771    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1772        if (&self).expired.load(Ordering::Relaxed) {
1773            //已到期,则返回
1774            return Poll::Ready(());
1775        } else {
1776            //未到期,则设置为已到期
1777            (&self).expired.store(true, Ordering::Relaxed);
1778        }
1779
1780        let task_id = self.rt.alloc::<O>();
1781        let reply = self.rt.pending(&task_id, cx.waker().clone());
1782
1783        //发送超时请求,并返回
1784        let r = (&self).producor.send(((&self).timeout, AsyncTimingTask::Pended(task_id.clone())));
1785        reply
1786    }
1787}
1788
1789impl<
1790    RT: AsyncRuntime<O>,
1791    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1792    O: Default + 'static,
1793> AsyncWaitTimeout<RT, P, O> {
1794    /// 构建等待指定超时任务的方法
1795    pub fn new(rt: RT,
1796               producor: Sender<(usize, AsyncTimingTask<P, O>)>,
1797               timeout: usize) -> Self {
1798        AsyncWaitTimeout {
1799            rt,
1800            producor,
1801            timeout,
1802            expired: AtomicBool::new(false), //设置初始值
1803        }
1804    }
1805}
1806
1807///
1808/// 本地等待指定超时
1809///
1810pub struct LocalAsyncWaitTimeout<
1811    RT: AsyncRuntime<O>,
1812    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1813    O: Default + 'static = (),
1814> {
1815    rt:         RT,                                     //当前运行时
1816    timer:      Arc<AsyncTaskTimerByNotCancel<P, O>>,   //定时器
1817    timeout:    usize,                                  //超时时长,单位ms
1818    expired:    AtomicBool,                             //是否已过期
1819}
1820
1821unsafe impl<
1822    RT: AsyncRuntime<O>,
1823    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1824    O: Default + 'static,
1825> Send for LocalAsyncWaitTimeout<RT, P, O> {}
1826unsafe impl<
1827    RT: AsyncRuntime<O>,
1828    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1829    O: Default + 'static,
1830> Sync for LocalAsyncWaitTimeout<RT, P, O> {}
1831
1832impl<
1833    RT: AsyncRuntime<O>,
1834    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1835    O: Default + 'static,
1836> Future for LocalAsyncWaitTimeout<RT, P, O> {
1837    type Output = ();
1838
1839    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1840        if (&self).expired.load(Ordering::Relaxed) {
1841            //已到期,则返回
1842            return Poll::Ready(());
1843        } else {
1844            //未到期,则设置为已到期
1845            (&self).expired.store(true, Ordering::Relaxed);
1846        }
1847
1848        let task_id = self.rt.alloc::<O>();
1849        let reply = self.rt.pending(&task_id, cx.waker().clone());
1850
1851        //设置本地超时请求,并返回
1852        (&self)
1853            .timer
1854            .set_timer(AsyncTimingTask::Pended(task_id.clone()),
1855                       (&self).timeout);
1856        reply
1857    }
1858}
1859
1860impl<
1861    RT: AsyncRuntime<O>,
1862    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1863    O: Default + 'static,
1864> LocalAsyncWaitTimeout<RT, P, O> {
1865    /// 构建等待指定超时任务的方法
1866    pub fn new(rt: RT,
1867               timer: Arc<AsyncTaskTimerByNotCancel<P, O>>,
1868               timeout: usize) -> Self {
1869        LocalAsyncWaitTimeout {
1870            rt,
1871            timer,
1872            timeout,
1873            expired: AtomicBool::new(false), //设置初始值
1874        }
1875    }
1876}
1877
1878///
1879/// 等待异步任务执行完成
1880///
1881pub struct AsyncWait<V: Send + 'static>(AsyncWaitAny<V>);
1882
1883unsafe impl<V: Send + 'static> Send for AsyncWait<V> {}
1884unsafe impl<V: Send + 'static> Sync for AsyncWait<V> {}
1885
1886/*
1887* 等待异步任务执行完成同步方法
1888*/
1889impl<V: Send + 'static> AsyncWait<V> {
1890    /// 派发指定超时时间的指定任务到指定的运行时,并返回派发是否成功
1891    pub fn spawn<RT, O, F>(&self,
1892                           rt: RT,
1893                           timeout: Option<usize>,
1894                           future: F) -> Result<()>
1895        where RT: AsyncRuntime<O>,
1896              O: Default + 'static,
1897              F: Future<Output = Result<V>> + Send + 'static {
1898        self.0.spawn(rt.clone(), future)?;
1899
1900        if let Some(timeout) = timeout {
1901            //设置了超时时间
1902            let rt_copy = rt.clone();
1903            self.0.spawn(rt, async move {
1904                rt_copy.timeout(timeout).await;
1905
1906                //返回超时错误
1907                Err(Error::new(ErrorKind::TimedOut, format!("Time out")))
1908            })
1909        } else {
1910            //未设置超时时间
1911            Ok(())
1912        }
1913    }
1914
1915    /// 派发指定超时时间的指定任务到本地运行时,并返回派发是否成功
1916    pub fn spawn_local<O, F>(&self,
1917                             timeout: Option<usize>,
1918                             future: F) -> Result<()>
1919        where O: Default + 'static,
1920              F: Future<Output = Result<V>> + Send + 'static {
1921        if let Some(rt) = local_async_runtime::<O>() {
1922            //当前线程有绑定运行时
1923            self.0.spawn_local(future)?;
1924
1925            if let Some(timeout) = timeout {
1926                //设置了超时时间
1927                let rt_copy = rt.clone();
1928                self.0.spawn_local(async move {
1929                    rt_copy.timeout(timeout).await;
1930
1931                    //返回超时错误
1932                    Err(Error::new(ErrorKind::TimedOut, format!("Time out")))
1933                })
1934            } else {
1935                //未设置超时时间
1936                Ok(())
1937            }
1938        } else {
1939            //当前线程未绑定运行时
1940            Err(Error::new(ErrorKind::Other, format!("Spawn wait task failed, reason: local async runtime not exist")))
1941        }
1942    }
1943}
1944
1945/*
1946* 等待异步任务执行完成异步方法
1947*/
1948impl<V: Send + 'static> AsyncWait<V> {
1949    /// 异步等待已派发任务的结果
1950    pub async fn wait_result(self) -> Result<V> {
1951        self.0.wait_result().await
1952    }
1953}
1954
1955///
1956/// 等待任意异步任务执行完成
1957///
1958pub struct AsyncWaitAny<V: Send + 'static> {
1959    capacity:       usize,                      //派发任务的容量
1960    producor:       AsyncSender<Result<V>>,     //异步返回值生成器
1961    consumer:       AsyncReceiver<Result<V>>,   //异步返回值接收器
1962}
1963
1964unsafe impl<V: Send + 'static> Send for AsyncWaitAny<V> {}
1965unsafe impl<V: Send + 'static> Sync for AsyncWaitAny<V> {}
1966
1967/*
1968* 等待任意异步任务执行完成同步方法
1969*/
1970impl<V: Send + 'static> AsyncWaitAny<V> {
1971    /// 派发指定任务到指定的运行时,并返回派发是否成功
1972    pub fn spawn<RT, O, F>(&self,
1973                           rt: RT,
1974                           future: F) -> Result<()>
1975        where RT: AsyncRuntime<O>,
1976              O: Default + 'static,
1977              F: Future<Output = Result<V>> + Send + 'static {
1978        let producor = self.producor.clone();
1979        rt.spawn_by_id(rt.alloc::<O>(), async move {
1980            let value = future.await;
1981            producor.into_send_async(value).await;
1982
1983            //返回异步任务的默认值
1984            Default::default()
1985        })
1986    }
1987
1988    /// 派发指定任务到本地运行时,并返回派发是否成功
1989    pub fn spawn_local<F>(&self,
1990                          future: F) -> Result<()>
1991        where F: Future<Output = Result<V>> + Send + 'static {
1992        if let Some(rt) = local_async_runtime() {
1993            //本地线程有绑定运行时
1994            let producor = self.producor.clone();
1995            rt.spawn(async move {
1996                let value = future.await;
1997                producor.into_send_async(value).await;
1998            })
1999        } else {
2000            //本地线程未绑定运行时
2001            Err(Error::new(ErrorKind::Other, format!("Spawn wait any task failed, reason: local async runtime not exist")))
2002        }
2003    }
2004}
2005
2006/*
2007* 等待任意异步任务执行完成异步方法
2008*/
2009impl<V: Send + 'static> AsyncWaitAny<V> {
2010    /// 异步等待任意已派发任务的结果
2011    pub async fn wait_result(self) -> Result<V> {
2012        match self.consumer.recv_async().await {
2013            Err(e) => {
2014                //接收错误,则立即返回
2015                Err(Error::new(ErrorKind::Other, format!("Wait any result failed, reason: {:?}", e)))
2016            },
2017            Ok(result) => {
2018                //接收成功,则立即返回
2019                result
2020            },
2021        }
2022    }
2023}
2024
2025///
2026/// 等待任意异步任务执行完成
2027///
2028pub struct AsyncWaitAnyCallback<V: Send + 'static> {
2029    capacity:   usize,                      //派发任务的容量
2030    producor:   AsyncSender<Result<V>>,     //异步返回值生成器
2031    consumer:   AsyncReceiver<Result<V>>,   //异步返回值接收器
2032}
2033
2034unsafe impl<V: Send + 'static> Send for AsyncWaitAnyCallback<V> {}
2035unsafe impl<V: Send + 'static> Sync for AsyncWaitAnyCallback<V> {}
2036
2037/*
2038* 等待任意异步任务执行完成同步方法
2039*/
2040impl<V: Send + 'static> AsyncWaitAnyCallback<V> {
2041    /// 派发指定任务到指定的运行时,并返回派发是否成功
2042    pub fn spawn<RT, O, F>(&self,
2043                           rt: RT,
2044                           future: F) -> Result<()>
2045        where RT: AsyncRuntime<O>,
2046              O: Default + 'static,
2047              F: Future<Output = Result<V>> + Send + 'static {
2048        let producor = self.producor.clone();
2049        rt.spawn_by_id(rt.alloc::<O>(), async move {
2050            let value = future.await;
2051            producor.into_send_async(value).await;
2052
2053            //返回异步任务的默认值
2054            Default::default()
2055        })
2056    }
2057
2058    /// 派发指定任务到本地运行时,并返回派发是否成功
2059    pub fn spawn_local<F>(&self,
2060                          future: F) -> Result<()>
2061        where F: Future<Output = Result<V>> + Send + 'static {
2062        if let Some(rt) = local_async_runtime() {
2063            //当前线程有绑定运行时
2064            let producor = self.producor.clone();
2065            rt.spawn(async move {
2066                let value = future.await;
2067                producor.into_send_async(value).await;
2068            })
2069        } else {
2070            //当前线程未绑定运行时
2071            Err(Error::new(ErrorKind::Other, format!("Spawn wait any task failed by callback, reason: current async runtime not exist")))
2072        }
2073    }
2074}
2075
2076/*
2077* 等待任意异步任务执行完成异步方法
2078*/
2079impl<V: Send + 'static> AsyncWaitAnyCallback<V> {
2080    /// 异步等待满足用户回调需求的已派发任务的结果
2081    pub async fn wait_result(mut self,
2082                             callback: impl Fn(&Result<V>) -> bool + Send + Sync + 'static) -> Result<V> {
2083        let checker = create_checker(self.capacity, callback);
2084        loop {
2085            match self.consumer.recv_async().await {
2086                Err(e) => {
2087                    //接收错误,则立即返回
2088                    return Err(Error::new(ErrorKind::Other, format!("Wait any result failed by callback, reason: {:?}", e)));
2089                },
2090                Ok(result) => {
2091                    //接收成功,则检查是否立即返回
2092                    if checker(&result) {
2093                        //检查通过,则立即唤醒等待的任务,否则等待其它任务唤醒
2094                        return result;
2095                    }
2096                },
2097            }
2098        }
2099    }
2100}
2101
2102// 根据用户提供的回调,生成检查器
2103fn create_checker<V, F>(len: usize,
2104                        callback: F) -> Arc<dyn Fn(&Result<V>) -> bool + Send + Sync + 'static>
2105    where V: Send + 'static,
2106          F: Fn(&Result<V>) -> bool + Send + Sync + 'static {
2107    let mut check_counter = AtomicUsize::new(len); //初始化检查计数器
2108    Arc::new(move |result| {
2109        if check_counter.fetch_sub(1, Ordering::SeqCst) == 1 {
2110            //最后一个任务的检查,则忽略用户回调,并立即返回成功
2111            true
2112        } else {
2113            //不是最后一个任务的检查,则调用用户回调,并根据用户回调确定是否成功
2114            callback(result)
2115        }
2116    })
2117}
2118
2119///
2120/// 异步映射归并
2121///
2122pub struct AsyncMapReduce<V: Send + 'static> {
2123    count:          usize,                              //派发的任务数量
2124    capacity:       usize,                              //派发任务的容量
2125    producor:       AsyncSender<(usize, Result<V>)>,    //异步返回值生成器
2126    consumer:       AsyncReceiver<(usize, Result<V>)>,  //异步返回值接收器
2127}
2128
2129unsafe impl<V: Send + 'static> Send for AsyncMapReduce<V> {}
2130
2131/*
2132* 异步映射归并同步方法
2133*/
2134impl<V: Send + 'static> AsyncMapReduce<V> {
2135    /// 映射指定任务到指定的运行时,并返回任务序号
2136    pub fn map<RT, O, F>(&mut self, rt: RT, future: F) -> Result<usize>
2137        where RT: AsyncRuntime<O>,
2138              O: Default + 'static,
2139              F: Future<Output = Result<V>> + Send + 'static {
2140        if self.count >= self.capacity {
2141            //已派发任务已达可派发任务的限制,则返回错误
2142            return Err(Error::new(ErrorKind::Other, format!("Map task to runtime failed, capacity: {}, reason: out of capacity", self.capacity)));
2143        }
2144
2145        let index = self.count;
2146        let producor = self.producor.clone();
2147        rt.spawn_by_id(rt.alloc::<O>(), async move {
2148            let value = future.await;
2149            producor.into_send_async((index, value)).await;
2150
2151            //返回异步任务的默认值
2152            Default::default()
2153        })?;
2154
2155        self.count += 1; //派发任务成功,则计数
2156        Ok(index)
2157    }
2158}
2159
2160/*
2161* 异步映射归并异步方法
2162*/
2163impl<V: Send + 'static> AsyncMapReduce<V> {
2164    /// 归并所有派发的任务
2165    pub async fn reduce(self, order: bool) -> Result<Vec<Result<V>>> {
2166        let mut count = self.count;
2167        let mut results = Vec::with_capacity(count);
2168        while count > 0 {
2169            match self.consumer.recv_async().await {
2170                Err(e) => {
2171                    //接收错误,则立即返回
2172                    return Err(Error::new(ErrorKind::Other, format!("Reduce result failed, reason: {:?}", e)));
2173                },
2174                Ok((index, result)) => {
2175                    //接收成功,则继续
2176                    results.push((index, result));
2177                    count -= 1;
2178                },
2179            }
2180        }
2181
2182        if order {
2183            //需要对结果集进行排序
2184            results.sort_by_key(|(key, _value)| {
2185                key.clone()
2186            });
2187        }
2188        let (_, values) = results
2189            .into_iter()
2190            .unzip::<usize, Result<V>, Vec<usize>, Vec<Result<V>>>();
2191
2192        Ok(values)
2193    }
2194}
2195
2196///
2197/// 异步管道过滤器结果
2198///
2199pub enum AsyncPipelineResult<O: 'static> {
2200    Disconnect,     //关闭管道
2201    Filtered(O),    //过滤后的值
2202}
2203
2204///
2205/// 派发一个工作线程
2206/// 返回线程的句柄,可以通过句柄关闭线程
2207/// 线程在没有任务可以执行时会休眠,当派发任务或唤醒任务时会自动唤醒线程
2208///
2209pub fn spawn_worker_thread<F0, F1>(thread_name: &str,
2210                                   thread_stack_size: usize,
2211                                   thread_handler: Arc<AtomicBool>,
2212                                   thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>, //用于唤醒运行时所在线程的条件变量
2213                                   sleep_timeout: u64,                                  //休眠超时时长,单位毫秒
2214                                   loop_interval: Option<u64>,                          //工作者线程循环的间隔时长,None为无间隔,单位毫秒
2215                                   loop_func: F0,
2216                                   get_queue_len: F1) -> Arc<AtomicBool>
2217    where F0: Fn() -> (bool, Duration) + Send + 'static,
2218          F1: Fn() -> usize + Send + 'static {
2219    let thread_status_copy = thread_handler.clone();
2220
2221    thread::Builder::new()
2222        .name(thread_name.to_string())
2223        .stack_size(thread_stack_size).spawn(move || {
2224        let mut sleep_count = 0;
2225
2226        while thread_handler.load(Ordering::Relaxed) {
2227            let (is_no_task, run_time) = loop_func();
2228
2229            if is_no_task {
2230                //当前没有任务
2231                if sleep_count > 1 {
2232                    //当前没有任务连续达到2次,则休眠线程
2233                    sleep_count = 0; //重置休眠计数
2234                    let (is_sleep, lock, condvar) = &*thread_waker;
2235                    let mut locked = lock.lock();
2236                    if get_queue_len() > 0 {
2237                        //当前有任务,则继续工作
2238                        continue;
2239                    }
2240
2241                    if !is_sleep.load(Ordering::Relaxed) {
2242                        //如果当前未休眠,则休眠
2243                        is_sleep.store(true, Ordering::SeqCst);
2244                        if condvar
2245                            .wait_for(
2246                                &mut locked,
2247                                Duration::from_millis(sleep_timeout),
2248                            )
2249                            .timed_out()
2250                        {
2251                            //条件超时唤醒,则设置状态为未休眠
2252                            is_sleep.store(false, Ordering::SeqCst);
2253                        }
2254                    }
2255
2256                    continue; //唤醒后立即尝试执行任务
2257                }
2258
2259                sleep_count += 1; //休眠计数
2260                if let Some(interval) = &loop_interval {
2261                    //设置了循环间隔时长
2262                    if let Some(remaining_interval) = Duration::from_millis(*interval).checked_sub(run_time){
2263                        //本次运行少于循环间隔,则休眠剩余的循环间隔,并继续执行任务
2264                        thread::sleep(remaining_interval);
2265                    }
2266                }
2267            } else {
2268                //当前有任务
2269                sleep_count = 0; //重置休眠计数
2270                if let Some(interval) = &loop_interval {
2271                    //设置了循环间隔时长
2272                    if let Some(remaining_interval) = Duration::from_millis(*interval).checked_sub(run_time){
2273                        //本次运行少于循环间隔,则休眠剩余的循环间隔,并继续执行任务
2274                        thread::sleep(remaining_interval);
2275                    }
2276                }
2277            }
2278        }
2279    });
2280
2281    thread_status_copy
2282}
2283
2284/// 唤醒工作者所在线程,如果线程当前正在运行,则忽略
2285pub fn wakeup_worker_thread<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(worker_waker: &Arc<(AtomicBool, Mutex<()>, Condvar)>, rt: &SingleTaskRuntime<O, P>) {
2286    //检查工作者所在线程是否需要唤醒
2287    if worker_waker.0.load(Ordering::Relaxed) && rt.len() > 0 {
2288        let (is_sleep, lock, condvar) = &**worker_waker;
2289        let _locked = lock.lock();
2290        is_sleep.store(false, Ordering::SeqCst); //设置为未休眠
2291        let _ = condvar.notify_one();
2292    }
2293}
2294
2295/// 注册全局异常处理器,会替换当前全局异常处理器
2296pub fn register_global_panic_handler<Handler>(handler: Handler)
2297    where Handler: Fn(thread::Thread, String, Option<String>, Option<(String, u32, u32)>) -> Option<i32> + Send + Sync + 'static {
2298    set_hook(Box::new(move |panic_info| {
2299        let thread_info = thread::current();
2300
2301        let payload = panic_info.payload();
2302        let payload_info = match payload.downcast_ref::<&str>() {
2303            None => {
2304                //不是String
2305                match payload.downcast_ref::<String>() {
2306                    None => {
2307                        //不是&'static str,则返回未知异常
2308                        "Unknow panic".to_string()
2309                    },
2310                    Some(info) => {
2311                        info.clone()
2312                    }
2313                }
2314            },
2315            Some(info) => {
2316                info.to_string()
2317            }
2318        };
2319
2320        let other_info = if let Some(arg) = panic_info.payload_as_str() {
2321            Some(arg.to_string())
2322        } else {
2323            None
2324        };
2325
2326        let location = if let Some(location) = panic_info.location() {
2327            Some((location.file().to_string(), location.line(), location.column()))
2328        } else {
2329            None
2330        };
2331
2332        if let Some(exit_code) = handler(thread_info, payload_info, other_info, location) {
2333            //需要关闭当前进程
2334            std::process::exit(exit_code);
2335        }
2336    }));
2337}
2338
2339/// 替换全局内存分配错误处理器
2340pub fn replace_global_alloc_error_handler() {
2341    set_alloc_error_hook(global_alloc_error_handle);
2342}
2343
2344fn global_alloc_error_handle(layout: Layout) {
2345    let bt = Backtrace::new();
2346    eprintln!("[UTC: {}][Thread: {}]Global memory allocation of {:?} bytes failed, stacktrace: \n{:?}",
2347              SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis(),
2348              thread::current().name().unwrap_or(""),
2349              layout.size(),
2350              bt);
2351}
2352
2353// 立即异步让出当前任务执行
2354pub(crate) struct YieldNow(bool);
2355
2356impl Future for YieldNow {
2357    type Output = ();
2358
2359    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2360        if self.0 {
2361            Poll::Ready(())
2362        } else {
2363            self.0 = true;
2364            cx.waker().wake_by_ref();
2365            Poll::Pending
2366        }
2367    }
2368}