pi_async_rt/rt/
serial.rs

1use std::thread;
2use std::any::Any;
3use std::pin::Pin;
4use std::ptr::null_mut;
5use std::vec::IntoIter;
6use std::time::Duration;
7use std::future::Future;
8use std::marker::PhantomData;
9use std::ops::{Deref, DerefMut};
10use std::cell::{RefCell, UnsafeCell};
11use std::task::{Poll, Waker, Context};
12use std::io::{Error, Result, ErrorKind};
13use std::fmt::{Debug, Formatter, Result as FmtResult};
14use std::sync::{Arc, atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering}};
15
16use futures::{future::{FutureExt, LocalBoxFuture},
17              stream::{Stream, StreamExt, LocalBoxStream},
18              task::ArcWake};
19use parking_lot::{Mutex, Condvar};
20use crossbeam_queue::ArrayQueue;
21use crossbeam_channel::{Sender, Receiver, unbounded};
22use flume::{Sender as AsyncSender, Receiver as AsyncReceiver};
23#[cfg(not(target_arch = "wasm32"))]
24use polling::Poller;
25use num_cpus;
26
27use pi_cancel_timer::Timer;
28use slotmap::{Key, KeyData};
29use quanta::{Clock, Instant as QInstant};
30
31use crate::{lock::spin,
32            rt::{PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME, TaskId, AsyncPipelineResult,
33                 serial_local_thread::{LocalTaskRunner, LocalTaskRuntime},
34                 serial_single_thread::SingleTaskRuntime,
35                 serial_worker_thread::{WorkerTaskRunner, WorkerRuntime}}};
36
37///
38/// 顺序执行的异步任务
39///
40pub struct AsyncTask<
41    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
42    O: Default + 'static = (),
43> {
44    uid:        TaskId,                                     //任务唯一id
45    future:     Mutex<Option<LocalBoxFuture<'static, O>>>,  //异步任务
46    pool:       Arc<P>,                                     //异步任务池
47    priority:   usize,                                      //异步任务优先级
48    context:    Option<UnsafeCell<Box<dyn Any>>>,           //异步任务上下文
49}
50
51unsafe impl<
52    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
53    O: Default + 'static,
54> Send for AsyncTask<P, O> {}
55unsafe impl<
56    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
57    O: Default + 'static,
58> Sync for AsyncTask<P, O> {}
59
60impl<
61    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
62    O: Default + 'static,
63> ArcWake for AsyncTask<P, O> {
64    #[cfg(not(target_arch = "aarch64"))]
65    fn wake_by_ref(arc_self: &Arc<Self>) {
66        let pool = arc_self.get_pool();
67        let _ = pool.push_keep(arc_self.clone());
68
69        if let Some(waits) = pool.get_waits() {
70            //当前任务属于多线程异步运行时
71            if let Some(worker_waker) = waits.pop() {
72                //有待唤醒的工作者
73                let (is_sleep, lock, condvar) = &*worker_waker;
74                let locked = lock.lock();
75                if is_sleep.load(Ordering::Relaxed) {
76                    //待唤醒的工作者,正在休眠,则立即唤醒此工作者
77                    if let Ok(true) = is_sleep
78                        .compare_exchange_weak(true,
79                                               false,
80                                               Ordering::SeqCst,
81                                               Ordering::SeqCst) {
82                        //确认需要唤醒,则唤醒
83                        condvar.notify_one();
84                    }
85                }
86            }
87        } else {
88            //当前线程属于单线程异步运行时
89            if let Some(thread_waker) = pool.get_thread_waker() {
90                //当前任务池绑定了所在线程的唤醒器,则快速检查是否需要唤醒所在线程
91                if thread_waker.0.load(Ordering::Relaxed) {
92                    let (is_sleep, lock, condvar) = &**thread_waker;
93                    let locked = lock.lock();
94                    //待唤醒的线程,正在休眠,则立即唤醒此线程
95                    if let Ok(true) = is_sleep
96                        .compare_exchange_weak(true,
97                                               false,
98                                               Ordering::SeqCst,
99                                               Ordering::SeqCst) {
100                        //确认需要唤醒,则唤醒
101                        condvar.notify_one();
102                    }
103                }
104            }
105        }
106    }
107    #[cfg(target_arch = "aarch64")]
108    fn wake_by_ref(arc_self: &Arc<Self>) {
109        let pool = arc_self.get_pool();
110        let _ = pool.push_keep(arc_self.clone());
111
112        if let Some(waits) = pool.get_waits() {
113            //当前任务属于多线程异步运行时
114            if let Some(worker_waker) = waits.pop() {
115                //有待唤醒的工作者
116                let (is_sleep, lock, condvar) = &*worker_waker;
117                let locked = lock.lock();
118                if is_sleep.load(Ordering::Relaxed) {
119                    //待唤醒的工作者,正在休眠,则立即唤醒此工作者
120                    if let Ok(true) = is_sleep
121                        .compare_exchange(true,
122                                          false,
123                                          Ordering::SeqCst,
124                                          Ordering::SeqCst) {
125                        //确认需要唤醒,则唤醒
126                        condvar.notify_one();
127                    }
128                }
129            }
130        } else {
131            //当前线程属于单线程异步运行时
132            if let Some(thread_waker) = pool.get_thread_waker() {
133                //当前任务池绑定了所在线程的唤醒器,则快速检查是否需要唤醒所在线程
134                if thread_waker.0.load(Ordering::Relaxed) {
135                    let (is_sleep, lock, condvar) = &**thread_waker;
136                    let locked = lock.lock();
137                    //待唤醒的线程,正在休眠,则立即唤醒此线程
138                    if let Ok(true) = is_sleep
139                        .compare_exchange(true,
140                                          false,
141                                          Ordering::SeqCst,
142                                          Ordering::SeqCst) {
143                        //确认需要唤醒,则唤醒
144                        condvar.notify_one();
145                    }
146                }
147            }
148        }
149    }
150}
151
152impl<
153    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
154    O: Default + 'static,
155> AsyncTask<P, O> {
156    /// 构建单线程任务
157    pub fn new(uid: TaskId,
158               pool: Arc<P>,
159               priority: usize,
160               future: Option<LocalBoxFuture<'static, O>>) -> AsyncTask<P, O> {
161        AsyncTask {
162            uid,
163            future: Mutex::new(future),
164            pool,
165            priority,
166            context: None,
167        }
168    }
169
170    /// 使用指定上下文构建单线程任务
171    pub fn with_context<C: 'static>(uid: TaskId,
172                                    pool: Arc<P>,
173                                    priority: usize,
174                                    future: Option<LocalBoxFuture<'static, O>>,
175                                    context: C) -> AsyncTask<P, O> {
176        let any = Box::new(context);
177
178        AsyncTask {
179            uid,
180            future: Mutex::new(future),
181            pool,
182            priority,
183            context: Some(UnsafeCell::new(any)),
184        }
185    }
186
187    /// 使用指定异步运行时和上下文构建单线程任务
188    pub fn with_runtime_and_context<RT, C>(runtime: &RT,
189                                           priority: usize,
190                                           future: Option<LocalBoxFuture<'static, O>>,
191                                           context: C) -> AsyncTask<P, O>
192        where RT: AsyncRuntime<O, Pool = P>,
193              C: 'static {
194        let any = Box::new(context);
195
196        AsyncTask {
197            uid: runtime.alloc::<O>(),
198            future: Mutex::new(future),
199            pool: runtime.shared_pool(),
200            priority,
201            context: Some(UnsafeCell::new(any)),
202        }
203    }
204
205    /// 检查是否允许唤醒
206    pub fn is_enable_wakeup(&self) -> bool {
207        self.uid.exist_waker::<O>()
208    }
209
210    /// 获取内部任务
211    pub fn get_inner(&self) -> Option<LocalBoxFuture<'static, O>> {
212        self.future.lock().take()
213    }
214
215    /// 设置内部任务
216    pub fn set_inner(&self, inner: Option<LocalBoxFuture<'static, O>>) {
217        *self.future.lock() = inner;
218    }
219
220    /// 获取任务的所有者
221    #[inline]
222    pub fn owner(&self) -> usize {
223        unsafe {
224            *self.uid.0.get() as usize
225        }
226    }
227
228    /// 获取异步任务优先级
229    pub fn priority(&self) -> usize {
230        self.priority
231    }
232
233    //判断异步任务是否有上下文
234    pub fn exist_context(&self) -> bool {
235        self.context.is_some()
236    }
237
238    //获取异步任务上下文的只读引用
239    pub fn get_context<C: 'static>(&self) -> Option<&C> {
240        if let Some(context) = &self.context {
241            //存在上下文
242            let any = unsafe { &*context.get() };
243            return <dyn Any>::downcast_ref::<C>(&**any);
244        }
245
246        None
247    }
248
249    //获取异步任务上下文的可写引用
250    pub fn get_context_mut<C: 'static>(&self) -> Option<&mut C> {
251        if let Some(context) = &self.context {
252            //存在上下文
253            let any = unsafe { &mut *context.get() };
254            return <dyn Any>::downcast_mut::<C>(&mut **any);
255        }
256
257        None
258    }
259
260    //设置异步任务上下文,返回上一个异步任务上下文
261    pub fn set_context<C: 'static>(&self, new: C) {
262        if let Some(context) = &self.context {
263            //存在上一个上下文,则释放上一个上下文
264            let _ = unsafe { &*context.get() };
265
266            //设置新的上下文
267            let any: Box<dyn Any + 'static> = Box::new(new);
268            unsafe { *context.get() = any; }
269        }
270    }
271
272    //获取异步任务的任务池
273    pub fn get_pool(&self) -> &P {
274        self.pool.as_ref()
275    }
276}
277
278///
279/// 异步任务池
280///
281pub trait AsyncTaskPool<O: Default + 'static = ()>: Default + 'static {
282    type Pool: AsyncTaskPoolExt<O> + AsyncTaskPool<O>;
283
284    /// 获取绑定的线程唯一id
285    fn get_thread_id(&self) -> usize;
286
287    /// 获取当前异步任务池内任务数量
288    fn len(&self) -> usize;
289
290    /// 将异步任务加入异步任务池
291    fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
292
293    /// 将异步任务加入本地异步任务池
294    fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
295
296    /// 将指定了优先级的异步任务加入任务池
297    fn push_priority(&self, priority: usize, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
298
299    /// 异步任务被唤醒时,将异步任务继续加入异步任务池
300    fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
301
302    /// 尝试从异步任务池中弹出一个异步任务
303    fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>>;
304
305    /// 尝试从异步任务池中弹出所有异步任务
306    fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>>;
307
308    /// 获取本地线程的唤醒器
309    fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
310        None
311    }
312}
313
314///
315/// 异步任务池扩展
316///
317pub trait AsyncTaskPoolExt<O: Default + 'static = ()>: 'static {
318    /// 设置待唤醒的工作者唤醒器队列
319    fn set_waits(&mut self,
320                 _waits: Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>) {}
321
322    /// 获取待唤醒的工作者唤醒器队列
323    fn get_waits(&self) -> Option<&Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>> {
324        //默认没有待唤醒的工作者唤醒器队列
325        None
326    }
327
328    /// 获取空闲的工作者的数量,这个数量大于0,表示可以新开线程来运行可分派的工作者
329    fn idler_len(&self) -> usize {
330        //默认不分派
331        0
332    }
333
334    /// 分派一个空闲的工作者
335    fn spawn_worker(&self) -> Option<usize> {
336        //默认不分派
337        None
338    }
339
340    /// 获取工作者的数量
341    fn worker_len(&self) -> usize {
342        //默认工作者数量和本机逻辑核数相同
343        #[cfg(not(target_arch = "wasm32"))]
344        return num_cpus::get();
345        #[cfg(target_arch = "wasm32")]
346        return 1;
347    }
348
349    /// 获取缓冲区的任务数量,缓冲区任务是未分配给工作者的任务
350    fn buffer_len(&self) -> usize {
351        //默认没有缓冲区
352        0
353    }
354
355    /// 设置当前绑定本地线程的唤醒器
356    fn set_thread_waker(&mut self, _thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) {
357        //默认不设置
358    }
359
360    /// 复制当前绑定本地线程的唤醒器
361    fn clone_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
362        //默认不复制
363        None
364    }
365
366    /// 关闭当前工作者
367    fn close_worker(&self) {
368        //默认不允许关闭工作者
369    }
370}
371
372///
373/// 顺序执行任务的异步运行时
374///
375pub trait AsyncRuntime<O: Default + 'static = ()>: Clone + Send + Sync + 'static {
376    type Pool: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = Self::Pool>;
377
378    /// 共享运行时内部任务池
379    fn shared_pool(&self) -> Arc<Self::Pool>;
380
381    /// 获取当前异步运行时的唯一id
382    fn get_id(&self) -> usize;
383
384    /// 获取当前异步运行时待处理任务数量
385    fn wait_len(&self) -> usize;
386
387    /// 获取当前异步运行时任务数量
388    fn len(&self) -> usize;
389
390    /// 分配异步任务的唯一id
391    fn alloc<R: 'static>(&self) -> TaskId;
392
393    /// 派发一个指定的异步任务到异步运行时
394    fn spawn<F>(&self, future: F) -> Result<TaskId>
395        where F: Future<Output = O> + 'static;
396
397    /// 派发一个异步任务到本地异步运行时,如果本地没有本异步运行时,则会派发到当前运行时中
398    fn spawn_local<F>(&self, future: F) -> Result<TaskId>
399        where F: Future<Output = O> + 'static;
400
401    /// 派发一个指定优先级的异步任务到异步运行时
402    fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
403        where F: Future<Output = O> + 'static;
404
405    /// 派发一个异步任务到异步运行时,并立即让出任务的当前运行
406    fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
407        where F: Future<Output = O> + 'static;
408
409    /// 派发一个在指定时间后执行的异步任务到异步运行时,时间单位ms
410    fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
411        where F: Future<Output = O> + 'static;
412
413    /// 派发一个指定任务唯一id的异步任务到异步运行时
414    fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
415        where F: Future<Output = O> + 'static;
416
417    /// 派发一个指定任务唯一id的异步任务到本地异步运行时,如果本地没有本异步运行时,则会派发到当前运行时中
418    fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
419        where F: Future<Output = O> + 'static;
420
421    /// 派发一个指定任务唯一id和任务优先级的异步任务到异步运行时
422    fn spawn_priority_by_id<F>(&self,
423                               task_id: TaskId,
424                               priority: usize,
425                               future: F) -> Result<()>
426        where F: Future<Output = O> + 'static;
427
428    /// 派发一个指定任务唯一id的异步任务到异步运行时,并立即让出任务的当前运行
429    fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
430        where F: Future<Output = O> + 'static;
431
432    /// 派发一个指定任务唯一id和在指定时间后执行的异步任务到异步运行时,时间单位ms
433    fn spawn_timing_by_id<F>(&self,
434                             task_id: TaskId,
435                             future: F,
436                             time: usize) -> Result<()>
437        where F: Future<Output = O> + 'static;
438
439    /// 挂起指定唯一id的异步任务
440    fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output>;
441
442    /// 唤醒指定唯一id的异步任务
443    fn wakeup<Output: 'static>(&self, task_id: &TaskId);
444
445    /// 挂起当前异步运行时的当前任务,并在指定的其它运行时上派发一个指定的异步任务,等待其它运行时上的异步任务完成后,唤醒当前运行时的当前任务,并返回其它运行时上的异步任务的值
446    fn wait<V: 'static>(&self) -> AsyncWait<V>;
447
448    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,其中任意一个任务完成,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成的任务的值将被忽略
449    fn wait_any<V: 'static>(&self, capacity: usize) -> AsyncWaitAny<V>;
450
451    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,任务返回后需要通过用户指定的检查回调进行检查,其中任意一个任务检查通过,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成或未检查通过的任务的值将被忽略,如果所有任务都未检查通过,则强制唤醒当前运行时的当前任务
452    fn wait_any_callback<V: 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V>;
453
454    /// 构建用于派发多个异步任务到指定运行时的映射归并,需要指定映射归并的容量
455    fn map_reduce<V: 'static>(&self, capacity: usize) -> AsyncMapReduce<V>;
456
457    /// 挂起当前异步运行时的当前任务,等待指定的时间后唤醒当前任务
458    fn timeout(&self, timeout: usize) -> LocalBoxFuture<'static, ()>;
459
460    /// 立即让出当前任务的执行
461    fn yield_now(&self) -> LocalBoxFuture<'static, ()>;
462
463    /// 生成一个异步管道,输入指定流,输入流的每个值通过过滤器生成输出流的值
464    fn pipeline<S, SO, F, FO>(&self, input: S, filter: F) -> LocalBoxStream<'static, FO>
465        where S: Stream<Item = SO> + 'static,
466              SO: 'static,
467              F: FnMut(SO) -> AsyncPipelineResult<FO> + 'static,
468              FO: 'static;
469
470    /// 关闭异步运行时,返回请求关闭是否成功
471    fn close(&self) -> bool;
472}
473
474///
475/// 顺序执行的异步运行时扩展
476///
477pub trait AsyncRuntimeExt<O: Default + 'static = ()> {
478    /// 派发一个指定的异步任务到异步运行时,并指定异步任务的初始化上下文
479    fn spawn_with_context<F, C>(&self,
480                                task_id: TaskId,
481                                future: F,
482                                context: C) -> Result<()>
483        where F: Future<Output = O> + 'static,
484              C: 'static;
485
486    /// 派发一个在指定时间后执行的异步任务到异步运行时,并指定异步任务的初始化上下文,时间单位ms
487    fn spawn_timing_with_context<F, C>(&self,
488                                       task_id: TaskId,
489                                       future: F,
490                                       context: C,
491                                       time: usize) -> Result<()>
492        where F: Future<Output = O> + 'static,
493              C: 'static;
494
495    /// 立即创建一个指定任务池的异步运行时,并执行指定的异步任务,阻塞当前线程,等待异步任务完成后返回
496    fn block_on<F>(&self, future: F) -> Result<F::Output>
497        where F: Future + 'static,
498              <F as Future>::Output: Default + 'static;
499}
500
501///
502/// 异步运行时构建器
503///
504pub struct AsyncRuntimeBuilder<O: Default + 'static = ()>(PhantomData<O>);
505
506impl<O: Default + 'static> AsyncRuntimeBuilder<O> {
507    /// 构建默认的本地异步任务运行时
508    pub fn default_local_thread(name: Option<&str>,
509                                stack_size: Option<usize>) -> LocalTaskRuntime<O> {
510        let runner = LocalTaskRunner::new();
511
512        let thread_name = if let Some(name) = name {
513            name
514        } else {
515            //默认的线程名称
516            "Default-Local-RT"
517        };
518        let thread_stack_size = if let Some(size) = stack_size {
519            size
520        } else {
521            //默认的线程堆栈大小
522            2 * 1024 * 1024
523        };
524
525        runner.startup(thread_name, thread_stack_size)
526    }
527
528    /// 构建默认的工作者异步运行时
529    pub fn default_worker_thread(worker_name: Option<&str>,
530                                 worker_stack_size: Option<usize>,
531                                 worker_sleep_timeout: Option<u64>,
532                                 worker_loop_interval: Option<Option<u64>>) -> WorkerRuntime<O> {
533        let runner = WorkerTaskRunner::default();
534
535        let thread_name = if let Some(name) = worker_name {
536            name
537        } else {
538            //默认的线程名称
539            "Default-Single-Worker"
540        };
541        let thread_stack_size = if let Some(size) = worker_stack_size {
542            size
543        } else {
544            //默认的线程堆栈大小
545            2 * 1024 * 1024
546        };
547        let sleep_timeout = if let Some(timeout) = worker_sleep_timeout {
548            timeout
549        } else {
550            //默认的线程休眠时长
551            1
552        };
553        let loop_interval = if let Some(interval) = worker_loop_interval {
554            interval
555        } else {
556            //默认的线程循环间隔时长
557            None
558        };
559
560        //创建线程并在线程中执行异步运行时
561        let clock = Clock::new();
562        let runner_copy = runner.clone();
563        let rt_copy = runner.get_runtime();
564        let rt = runner.startup(
565            thread_name,
566            thread_stack_size,
567            sleep_timeout,
568            loop_interval,
569            move || {
570                let now = clock.recent();
571                match runner_copy.run_once() {
572                    Err(e) => {
573                        panic!("Run runner failed, reason: {:?}", e);
574                    },
575                    Ok(len) => {
576                        (len == 0,
577                         clock
578                             .recent()
579                             .duration_since(now))
580                    },
581                }
582            },
583            move || {
584                rt_copy.wait_len() + rt_copy.len()
585            },
586        );
587
588        rt
589    }
590
591    /// 构建自定义的本地异步任务运行时
592    #[cfg(not(target_arch = "wasm32"))]
593    pub fn custom_local_thread(name: Option<&str>,
594                               stack_size: Option<usize>,
595                               poller: Option<Arc<Poller>>,
596                               try_count: Option<usize>,
597                               timeout: Option<Duration>,) -> LocalTaskRuntime<O> {
598        let poller = if let Some(poller) = poller {
599            poller
600        } else {
601            Arc::new(Poller::new().expect("Failed to create poller"))
602        };
603        let runner = LocalTaskRunner::with_poll(poller);
604
605        let thread_name = if let Some(name) = name {
606            name
607        } else {
608            //默认的线程名称
609            "Custom-Local-RT"
610        };
611        let thread_stack_size = if let Some(size) = stack_size {
612            size
613        } else {
614            //默认的线程堆栈大小
615            2 * 1024 * 1024
616        };
617        let try_count = try_count.unwrap_or(3);
618
619        runner.startup_with_poll(
620            thread_name,
621            thread_stack_size,
622            try_count,
623            timeout
624        )
625    }
626
627    /// 构建自定义的工作者异步运行时
628    pub fn custom_worker_thread<P, F0, F1>(pool: P,
629                                           worker_handle: Arc<AtomicBool>,
630                                           worker_condvar: Arc<(AtomicBool, Mutex<()>, Condvar)>,
631                                           thread_name: &str,
632                                           thread_stack_size: usize,
633                                           sleep_timeout: u64,
634                                           loop_interval: Option<u64>,
635                                           loop_func: F0,
636                                           get_queue_len: F1) -> WorkerRuntime<O, P>
637        where P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
638              F0: Fn() -> (bool, Duration) + Send + 'static,
639              F1: Fn() -> usize + Send + 'static {
640        let runner = WorkerTaskRunner::new(pool,
641                                           worker_handle,
642                                           worker_condvar);
643
644        //创建线程并在线程中执行异步运行时
645        let rt_copy = runner.get_runtime();
646        let rt = runner.startup(
647            thread_name,
648            thread_stack_size,
649            sleep_timeout,
650            loop_interval,
651            loop_func,
652            move || {
653                rt_copy.wait_len() + get_queue_len()
654            },
655        );
656
657        rt
658    }
659}
660
661/// 绑定指定异步运行时到本地线程
662pub fn bind_local_thread<O: Default + 'static>(runtime: LocalAsyncRuntime<O>) {
663    match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
664        let raw = Arc::into_raw(Arc::new(runtime)) as *mut LocalAsyncRuntime<O> as *mut ();
665        rt.store(raw, Ordering::Relaxed);
666    }) {
667        Err(e) => {
668            panic!("Bind single runtime to local thread failed, reason: {:?}", e);
669        },
670        Ok(_) => (),
671    }
672}
673
674/// 从本地线程解绑单线程异步任务执行器
675pub fn unbind_local_thread() {
676    let _ = PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
677        rt.store(null_mut(), Ordering::Relaxed);
678    });
679}
680
681///
682/// 本地线程绑定的异步运行时
683///
684pub struct LocalAsyncRuntime<O: Default + 'static> {
685    inner:              *const (),                                                          //内部运行时指针
686    get_id_func:        fn(*const ()) -> usize,                                             //获取本地运行时的id的函数
687    spawn_func:         fn(*const (), LocalBoxFuture<'static, O>) -> Result<()>,            //派发函数
688    spawn_timing_func:  fn(*const (), LocalBoxFuture<'static, O>, usize) -> Result<()>,     //定时派发函数
689    timeout_func:       fn(*const (), usize) -> LocalBoxFuture<'static, ()>,                //超时函数
690}
691
692unsafe impl<O: Default + 'static> Send for LocalAsyncRuntime<O> {}
693unsafe impl<O: Default + 'static> Sync for LocalAsyncRuntime<O> {}
694
695impl<O: Default + 'static> LocalAsyncRuntime<O> {
696    /// 创建本地线程绑定的异步运行时
697    pub fn new(inner: *const (),
698               get_id_func: fn(*const ()) -> usize,
699               spawn_func: fn(*const (), LocalBoxFuture<'static, O>) -> Result<()>,
700               spawn_timing_func: fn(*const (), LocalBoxFuture<'static, O>, usize) -> Result<()>,
701               timeout_func: fn(*const (), usize) -> LocalBoxFuture<'static, ()>) -> Self {
702        LocalAsyncRuntime {
703            inner,
704            get_id_func,
705            spawn_func,
706            spawn_timing_func,
707            timeout_func,
708        }
709    }
710
711    /// 获取本地运行时的id
712    #[inline]
713    pub fn get_id(&self) -> usize {
714        (self.get_id_func)(self.inner)
715    }
716
717    /// 派发一个指定的异步任务到本地线程绑定的异步运行时
718    #[inline]
719    pub fn spawn<F>(&self, future: F) -> Result<()>
720        where F: Future<Output = O> + 'static {
721        (self.spawn_func)(self.inner, async move {
722            future.await
723        }.boxed_local())
724    }
725
726    /// 定时派发一个指定的异步任务到本地线程绑定的异步运行时
727    #[inline]
728    pub fn sapwn_timing_func<F>(&self, future: F, timeout: usize) -> Result<()>
729        where F: Future<Output = O> + 'static {
730        (self.spawn_timing_func)(self.inner,
731                                 async move {
732                                     future.await
733                                 }.boxed_local(),
734                                 timeout)
735    }
736
737    /// 挂起本地线程绑定的异步运行时的当前任务,等待指定的时间后唤醒当前任务
738    #[inline]
739    pub fn timeout(&self, timeout: usize) -> LocalBoxFuture<'static, ()> {
740        (self.timeout_func)(self.inner, timeout)
741    }
742}
743
744///
745/// 获取本地线程绑定的顺序执行任务的异步运行时
746/// 注意:O如果与本地线程绑定的运行时的O不相同,则无法获取本地线程绑定的运行时
747///
748pub fn local_serial_async_runtime<O: Default + 'static>() -> Option<Arc<LocalAsyncRuntime<O>>> {
749    match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |ptr| {
750        let raw = ptr.load(Ordering::Relaxed) as *const LocalAsyncRuntime<O>;
751        unsafe {
752            if raw.is_null() {
753                //本地线程未绑定异步运行时
754                None
755            } else {
756                //本地线程已绑定异步运行时
757                let shared: Arc<LocalAsyncRuntime<O>> = unsafe { Arc::from_raw(raw) };
758                let result = shared.clone();
759                Arc::into_raw(shared); //避免提前释放
760                Some(result)
761            }
762        }
763    }) {
764        Err(_) => None, //本地线程没有绑定异步运行时
765        Ok(rt) => rt,
766    }
767}
768
769///
770/// 派发任务到本地线程绑定的异步运行时,如果本地线程没有异步运行时,则返回错误
771/// 注意:F::Output如果与本地线程绑定的运行时的O不相同,则无法执行指定任务
772///
773pub fn spawn_local<O, F>(future: F) -> Result<()>
774    where O: Default + 'static,
775          F: Future<Output = O> + 'static {
776    if let Some(rt) = local_serial_async_runtime::<O>() {
777        rt.spawn(future)
778    } else {
779        Err(Error::new(ErrorKind::Other, format!("Spawn task to local thread failed, reason: runtime not exist")))
780    }
781}
782
783///
784/// 获取本地线程绑定的异步运行时
785/// 注意:O如果与本地线程绑定的运行时的O不相同,则无法获取本地线程绑定的运行时
786///
787pub fn local_async_runtime<O: Default + 'static>() -> Option<Arc<LocalAsyncRuntime<O>>> {
788    match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |ptr| {
789        let raw = ptr.load(Ordering::Relaxed) as *const LocalAsyncRuntime<O>;
790        unsafe {
791            if raw.is_null() {
792                //本地线程未绑定异步运行时
793                None
794            } else {
795                //本地线程已绑定异步运行时
796                let shared: Arc<LocalAsyncRuntime<O>> = unsafe { Arc::from_raw(raw) };
797                let result = shared.clone();
798                Arc::into_raw(shared); //避免提前释放
799                Some(result)
800            }
801        }
802    }) {
803        Err(_) => None, //本地线程没有绑定异步运行时
804        Ok(rt) => rt,
805    }
806}
807
808///
809/// 同步非阻塞的异步值,只允许被同步非阻塞的设置一次值
810///
811pub struct AsyncValue<V: 'static>(Arc<InnerAsyncValue<V>>);
812
813unsafe impl<V: 'static> Send for AsyncValue<V> {}
814unsafe impl<V: 'static> Sync for AsyncValue<V> {}
815
816impl<V: 'static> Clone for AsyncValue<V> {
817    fn clone(&self) -> Self {
818        AsyncValue(self.0.clone())
819    }
820}
821
822impl<V: Send + 'static> Debug for AsyncValue<V> {
823    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
824        write!(f,
825               "AsyncValue[status = {}]",
826               self.0.status.load(Ordering::Acquire))
827    }
828}
829
830impl<V: 'static> Future for AsyncValue<V> {
831    type Output = V;
832
833    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
834        let mut spin_len = 1;
835        while self.0.status.load(Ordering::Acquire) == 2 {
836            //还未完成设置值,则自旋等待
837            spin_len = spin(spin_len);
838        }
839
840        if self.0.status.load(Ordering::Acquire) == 3 {
841            if let Some(value) = unsafe { (*(&self).0.value.get()).take() } {
842                //异步值已就绪
843                return Poll::Ready(value);
844            }
845        }
846
847        unsafe {
848            *self.0.waker.get() = Some(cx.waker().clone()); //设置异步值的唤醒器
849        }
850
851        let mut spin_len = 1;
852        loop {
853            match self.0.status.compare_exchange(0,
854                                                 1, Ordering::Acquire,
855                                                 Ordering::Relaxed) {
856                Err(2) => {
857                    //异步值准备设置值,则稍后重试
858                    spin_len = spin(spin_len);
859                    continue;
860                },
861                Err(3) => {
862                    //异步值已就绪
863                    let value = unsafe { (*(&self).0.value.get()).take().unwrap() };
864                    return Poll::Ready(value);
865                },
866                Err(_) => {
867                    unimplemented!();
868                },
869                Ok(_) => {
870                    //异步值等待设置后唤醒
871                    return Poll::Pending;
872                },
873            }
874        }
875    }
876}
877
878/*
879* 同步非阻塞的异步值同步方法
880*/
881impl<V: 'static> AsyncValue<V> {
882    /// 构建异步值,默认值为未就绪
883    pub fn new() -> Self {
884        let inner = InnerAsyncValue {
885            value: UnsafeCell::new(None),
886            waker: UnsafeCell::new(None),
887            status: AtomicU8::new(0),
888        };
889
890        AsyncValue(Arc::new(inner))
891    }
892
893    /// 判断异步值是否已完成设置
894    pub fn is_complete(&self) -> bool {
895        self
896            .0
897            .status
898            .load(Ordering::Relaxed) == 3
899    }
900
901    /// 设置异步值
902    pub fn set(self, value: V) {
903        loop {
904            match self.0.status.compare_exchange(1,
905                                                 2,
906                                                 Ordering::Acquire,
907                                                 Ordering::Relaxed) {
908                Err(0) => {
909                    match self.0.status.compare_exchange(0,
910                                                         2,
911                                                         Ordering::Acquire,
912                                                         Ordering::Relaxed) {
913                        Err(1) => {
914                            //异步值的唤醒器已就绪,则继续尝试获取锁
915                            continue;
916                        },
917                        Err(_) => {
918                            //异步值正在设置或已完成设置,则立即返回
919                            return;
920                        },
921                        Ok(_) => {
922                            //异步值的唤醒器未就绪且获取到锁,则设置异步值后将状态设置为已完成设置,并立即返回
923                            unsafe { *self.0.value.get() = Some(value); }
924                            self.0.status.store(3, Ordering::Release);
925                            return;
926                        }
927                    }
928                },
929                Err(_) => {
930                    //异步值正在设置或已完成设置,则立即返回
931                    return;
932                },
933                Ok(_) => {
934                    //异步值的唤醒器已就绪且获取到锁,则立即退出自旋
935                    break;
936                }
937            }
938        }
939
940        //已锁且获取到锁,则设置异步值,将状态设置为已完成设置,并立即唤醒异步值
941        unsafe { *self.0.value.get() = Some(value); }
942        self.0.status.store(3, Ordering::Release);
943        let waker = unsafe { (*self.0.waker.get()).take().unwrap() };
944        waker.wake();
945    }
946}
947
948// 同步非阻塞的内部异步值,只允许被同步非阻塞的设置一次值
949pub struct InnerAsyncValue<V: 'static> {
950    value:  UnsafeCell<Option<V>>,      //值
951    waker:  UnsafeCell<Option<Waker>>,  //唤醒器
952    status: AtomicU8,                   //状态
953}
954
955///
956/// 异步非阻塞可变值的守护者
957///
958pub struct AsyncVariableGuard<'a, V: 'static> {
959    value:  &'a UnsafeCell<Option<V>>,      //值
960    waker:  &'a UnsafeCell<Option<Waker>>,  //唤醒器
961    status: &'a AtomicU8,                   //值状态
962}
963
964unsafe impl<V: 'static> Send for AsyncVariableGuard<'_, V> {}
965
966impl<V: 'static> Drop for AsyncVariableGuard<'_, V> {
967    fn drop(&mut self) {
968        //当前异步可变值已锁定,则解除锁定
969        //当前异步可变值的状态为2或6,表示当前异步可变值的唤醒器未就绪并已锁定,或当前异步可变值不需要唤醒并已完成所有修改
970        //当前异步可变值的状态为3或7,表示当前异步可变值的唤醒器已就绪并已锁定,或当前异步可变值已唤醒并已完成所有修改
971        self.status.fetch_sub(2, Ordering::Relaxed);
972    }
973}
974
975impl<V: 'static> Deref for AsyncVariableGuard<'_, V> {
976    type Target = Option<V>;
977
978    fn deref(&self) -> &Self::Target {
979        unsafe {
980            &*self.value.get()
981        }
982    }
983}
984
985impl<V: 'static> DerefMut for AsyncVariableGuard<'_, V> {
986    fn deref_mut(&mut self) -> &mut Self::Target {
987        unsafe {
988            &mut *self.value.get()
989        }
990    }
991}
992
993impl<V: 'static> AsyncVariableGuard<'_, V> {
994    /// 完成异步可变值的修改
995    pub fn finish(self) {
996        //设置异步可变值的状态为已完成修改
997        if self.status.fetch_add(4, Ordering::Relaxed) == 3 {
998            if let Some(waker) = unsafe { (&mut *self.waker.get()).take() } {
999                //当前异步可变值需要唤醒,则立即唤醒异步可变值
1000                waker.wake();
1001            }
1002        }
1003    }
1004}
1005
1006///
1007/// 异步非阻塞可变值,在完成前允许被同步非阻塞的修改多次
1008///
1009pub struct AsyncVariable<V: 'static>(Arc<InnerAsyncVariable<V>>);
1010
1011unsafe impl<V: 'static> Send for AsyncVariable<V> {}
1012unsafe impl<V: 'static> Sync for AsyncVariable<V> {}
1013
1014impl<V: 'static> Clone for AsyncVariable<V> {
1015    fn clone(&self) -> Self {
1016        AsyncVariable(self.0.clone())
1017    }
1018}
1019
1020impl<V: 'static> Future for AsyncVariable<V> {
1021    type Output = V;
1022
1023    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1024        unsafe {
1025            *self.0.waker.get() = Some(cx.waker().clone()); //设置异步可变值的唤醒器准备就绪
1026        }
1027
1028        let mut spin_len = 1;
1029        loop {
1030            match self.0.status.compare_exchange(0,
1031                                                 1,
1032                                                 Ordering::Acquire,
1033                                                 Ordering::Relaxed) {
1034                Err(current) if current & 4 != 0 => {
1035                    //异步可变值已完成所有修改,则立即返回
1036                    unsafe {
1037                        let _ = (&mut *self.0.waker.get()).take(); //释放异步可变值的唤醒器
1038                        return Poll::Ready((&mut *(&self).0.value.get()).take().unwrap());
1039                    }
1040                },
1041                Err(_) => {
1042                    //还未完成值修改,则自旋等待
1043                    spin_len = spin(spin_len);
1044                },
1045                Ok(_) => {
1046                    //异步可变值已挂起
1047                    return Poll::Pending;
1048                },
1049            }
1050        }
1051    }
1052}
1053
1054impl<V: 'static> AsyncVariable<V> {
1055    /// 构建异步可变值,默认值为未就绪
1056    pub fn new() -> Self {
1057        let inner = InnerAsyncVariable {
1058            value: UnsafeCell::new(None),
1059            waker: UnsafeCell::new(None),
1060            status: AtomicU8::new(0),
1061        };
1062
1063        AsyncVariable(Arc::new(inner))
1064    }
1065
1066    /// 判断异步可变值是否已完成设置
1067    pub fn is_complete(&self) -> bool {
1068        self
1069            .0
1070            .status
1071            .load(Ordering::Acquire) & 4 != 0
1072    }
1073
1074    /// 锁住待修改的异步可变值,并返回当前异步可变值的守护者,如果异步可变值已完成修改则返回空
1075    pub fn lock(&self) -> Option<AsyncVariableGuard<V>> {
1076        let mut spin_len = 1;
1077        loop {
1078            match self
1079                .0
1080                .status
1081                .compare_exchange(1,
1082                                  3,
1083                                  Ordering::Acquire,
1084                                  Ordering::Relaxed) {
1085                Err(0) => {
1086                    //异步可变值还未就绪,则自旋等待
1087                    match self
1088                        .0
1089                        .status
1090                        .compare_exchange(0,
1091                                          2,
1092                                          Ordering::Acquire,
1093                                          Ordering::Relaxed) {
1094                        Err(1) => {
1095                            //异步可变值已就绪,则继续尝试获取锁
1096                            continue;
1097                        },
1098                        Err(2) => {
1099                            //异步可变值的唤醒器未就绪且已锁,但未获取到锁,则自旋等待
1100                            spin_len = spin(spin_len);
1101                        },
1102                        Err(3) => {
1103                            //异步可变值的唤醒器已就绪且已锁,但未获取到锁,则自旋等待
1104                            spin_len = spin(spin_len);
1105                        },
1106                        Err(_) => {
1107                            //已完成,则返回空
1108                            return None;
1109                        },
1110                        Ok(_) => {
1111                            //异步可变值的唤醒器未就绪且获取到锁,则返回异步可变值的守护者
1112                            let guard = AsyncVariableGuard {
1113                                value: &self.0.value,
1114                                waker: &self.0.waker,
1115                                status: &self.0.status,
1116                            };
1117
1118                            return Some(guard)
1119                        },
1120                    }
1121                },
1122                Err(2) => {
1123                    //异步可变值的唤醒器未就绪且已锁,但未获取到锁,则自旋等待
1124                    spin_len = spin(spin_len);
1125                },
1126                Err(3) => {
1127                    //异步可变值的唤醒器已就绪且已锁,但未获取到锁,则自旋等待
1128                    spin_len = spin(spin_len);
1129                },
1130                Err(_) => {
1131                    //已完成,则返回空
1132                    return None;
1133                }
1134                Ok(_) => {
1135                    //异步可变值的唤醒器已就绪且获取到锁,则返回异步可变值的守护者
1136                    let guard = AsyncVariableGuard {
1137                        value: &self.0.value,
1138                        waker: &self.0.waker,
1139                        status: &self.0.status,
1140                    };
1141
1142                    return Some(guard)
1143                },
1144            }
1145        }
1146    }
1147}
1148
1149// 内部异步非阻塞可变值,在完成前允许被同步非阻塞的修改多次
1150pub struct InnerAsyncVariable<V: 'static> {
1151    value:  UnsafeCell<Option<V>>,      //值
1152    waker:  UnsafeCell<Option<Waker>>,  //唤醒器
1153    status: AtomicU8,                   //状态
1154}
1155
1156///
1157/// 等待异步任务运行的结果
1158///
1159pub struct AsyncWaitResult<V: 'static>(pub Arc<RefCell<Option<Result<V>>>>);
1160
1161unsafe impl<V: 'static> Send for AsyncWaitResult<V> {}
1162unsafe impl<V: 'static> Sync for AsyncWaitResult<V> {}
1163
1164impl<V: 'static> Clone for AsyncWaitResult<V> {
1165    fn clone(&self) -> Self {
1166        AsyncWaitResult(self.0.clone())
1167    }
1168}
1169
1170///
1171/// 等待异步任务运行的结果集
1172///
1173pub struct AsyncWaitResults<V: 'static>(pub Arc<RefCell<Option<Vec<Result<V>>>>>);
1174
1175unsafe impl<V: 'static> Send for AsyncWaitResults<V> {}
1176unsafe impl<V: 'static> Sync for AsyncWaitResults<V> {}
1177
1178impl<V: 'static> Clone for AsyncWaitResults<V> {
1179    fn clone(&self) -> Self {
1180        AsyncWaitResults(self.0.clone())
1181    }
1182}
1183
1184///
1185/// 异步定时器任务
1186///
1187pub enum AsyncTimingTask<
1188    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1189    O: Default + 'static = (),
1190> {
1191    Pended(TaskId),                 //已挂起的定时任务
1192    WaitRun(Arc<AsyncTask<P, O>>),  //等待执行的定时任务
1193}
1194
1195///
1196/// 异步任务本地定时器
1197///
1198pub struct AsyncTaskTimer<
1199    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1200    O: Default + 'static = (),
1201> {
1202    producor:   Sender<(usize, AsyncTimingTask<P, O>)>,                     //定时任务生产者
1203    consumer:   Receiver<(usize, AsyncTimingTask<P, O>)>,                   //定时任务消费者
1204    timer:      Arc<RefCell<Timer<AsyncTimingTask<P, O>, 1000, 60, 3>>>,    //定时器
1205    clock:      Clock,                                                      //定时器时钟
1206    now:        QInstant,                                                   //当前时间
1207}
1208
1209unsafe impl<
1210    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1211    O: Default + 'static,
1212> Send for AsyncTaskTimer<P, O> {}
1213unsafe impl<
1214    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1215    O: Default + 'static,
1216> Sync for AsyncTaskTimer<P, O> {}
1217
1218impl<
1219    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1220    O: Default + 'static,
1221> AsyncTaskTimer<P, O> {
1222    /// 构建异步任务本地定时器
1223    pub fn new() -> Self {
1224        let (producor, consumer) = unbounded();
1225        let clock = Clock::new();
1226        let now = clock.recent();
1227
1228        AsyncTaskTimer {
1229            producor,
1230            consumer,
1231            timer: Arc::new(RefCell::new(Timer::<AsyncTimingTask<P, O>, 1000, 60, 3>::default())),
1232            clock,
1233            now,
1234        }
1235    }
1236
1237    /// 获取定时任务生产者
1238    #[inline]
1239    pub fn get_producor(&self) -> &Sender<(usize, AsyncTimingTask<P, O>)> {
1240        &self.producor
1241    }
1242
1243    /// 获取剩余未到期的定时器任务数量
1244    #[inline]
1245    pub fn len(&self) -> usize {
1246        let timer = self.timer.as_ref().borrow();
1247        timer.add_count() - timer.remove_count()
1248    }
1249
1250    /// 设置定时器
1251    pub fn set_timer(&self, task: AsyncTimingTask<P, O>, timeout: usize) -> usize {
1252        let current_time = self
1253            .clock
1254            .recent()
1255            .duration_since(self.now)
1256            .as_millis() as u64;
1257        self
1258            .timer
1259            .borrow_mut()
1260            .push_time(current_time + timeout as u64, task)
1261            .data()
1262            .as_ffi() as usize
1263    }
1264
1265    /// 取消定时器
1266    pub fn cancel_timer(&self, timer_ref: usize) -> Option<AsyncTimingTask<P, O>> {
1267        if let Some(item) =self
1268            .timer
1269            .borrow_mut()
1270            .cancel(KeyData::from_ffi(timer_ref as u64).into()) {
1271            Some(item)
1272        } else {
1273            None
1274        }
1275    }
1276
1277    /// 消费所有定时任务,返回定时任务数量
1278    pub fn consume(&self) -> usize {
1279        let mut len = 0;
1280        let timer_tasks = self.consumer.try_iter().collect::<Vec<(usize, AsyncTimingTask<P, O>)>>();
1281        for (timeout, task) in timer_tasks {
1282            self.set_timer(task, timeout);
1283            len += 1;
1284        }
1285
1286        len
1287    }
1288
1289    /// 判断当前时间是否有可以弹出的任务,如果有可以弹出的任务,则返回当前时间,否则返回空
1290    pub fn is_require_pop(&self) -> Option<u64> {
1291        let current_time = self
1292            .clock
1293            .recent()
1294            .duration_since(self.now)
1295            .as_millis() as u64;
1296        if self.timer.borrow_mut().is_ok(current_time) {
1297            Some(current_time)
1298        } else {
1299            None
1300        }
1301    }
1302
1303    /// 从定时器中弹出指定时间的一个到期任务
1304    pub fn pop(&self, current_time: u64) -> Option<(usize, AsyncTimingTask<P, O>)> {
1305        if let Some((key, item)) = self.timer.borrow_mut().pop_kv(current_time) {
1306            Some((key.data().as_ffi() as usize, item))
1307        } else {
1308            None
1309        }
1310    }
1311}
1312
1313///
1314/// 等待指定超时
1315///
1316pub struct AsyncWaitTimeout<
1317    RT: AsyncRuntime<O>,
1318    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1319    O: Default + 'static = (),
1320> {
1321    rt:         RT,                                     //当前运行时
1322    producor:   Sender<(usize, AsyncTimingTask<P, O>)>, //超时请求生产者
1323    timeout:    usize,                                  //超时时长,单位ms
1324    expired:    AtomicBool,                             //是否已过期
1325}
1326
1327unsafe impl<
1328    RT: AsyncRuntime<O>,
1329    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1330    O: Default + 'static,
1331> Send for AsyncWaitTimeout<RT, P, O> {}
1332unsafe impl<
1333    RT: AsyncRuntime<O>,
1334    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1335    O: Default + 'static,
1336> Sync for AsyncWaitTimeout<RT, P, O> {}
1337
1338impl<
1339    RT: AsyncRuntime<O>,
1340    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1341    O: Default + 'static,
1342> Future for AsyncWaitTimeout<RT, P, O> {
1343    type Output = ();
1344
1345    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1346        if (&self).expired.load(Ordering::Relaxed) {
1347            //已到期,则返回
1348            return Poll::Ready(());
1349        } else {
1350            //未到期,则设置为已到期
1351            (&self).expired.store(true, Ordering::Relaxed);
1352        }
1353
1354        let task_id = self.rt.alloc::<O>();
1355        let reply = self.rt.pending(&task_id, cx.waker().clone());
1356
1357        //发送超时请求,并返回
1358        (&self).producor.send(((&self).timeout, AsyncTimingTask::Pended(task_id)));
1359        reply
1360    }
1361}
1362
1363impl<
1364    RT: AsyncRuntime<O>,
1365    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1366    O: Default + 'static,
1367> AsyncWaitTimeout<RT, P, O> {
1368    /// 构建等待指定超时任务的方法
1369    pub fn new(rt: RT,
1370               producor: Sender<(usize, AsyncTimingTask<P, O>)>,
1371               timeout: usize) -> Self {
1372        AsyncWaitTimeout {
1373            rt,
1374            producor,
1375            timeout,
1376            expired: AtomicBool::new(false), //设置初始值
1377        }
1378    }
1379}
1380
1381///
1382/// 等待异步任务执行完成
1383///
1384pub struct AsyncWait<V: 'static>(AsyncWaitAny<V>);
1385
1386unsafe impl<V: 'static> Send for AsyncWait<V> {}
1387unsafe impl<V: 'static> Sync for AsyncWait<V> {}
1388
1389/*
1390* 等待异步任务执行完成同步方法
1391*/
1392impl<V: 'static> AsyncWait<V> {
1393    /// 构建等待异步任务执行完成
1394    pub(crate) fn new(inner: AsyncWaitAny<V>) -> Self {
1395        AsyncWait(inner)
1396    }
1397
1398    /// 派发指定超时时间的指定任务到指定的运行时,并返回派发是否成功
1399    pub fn spawn<RT, O, F>(&self,
1400                           rt: RT,
1401                           timeout: Option<usize>,
1402                           future: F) -> Result<()>
1403        where RT: AsyncRuntime<O>,
1404              O: Default + 'static,
1405              F: Future<Output = Result<V>> + 'static {
1406        self.0.spawn(rt.clone(), future)?;
1407
1408        if let Some(timeout) = timeout {
1409            //设置了超时时间
1410            let rt_copy = rt.clone();
1411            self.0.spawn(rt, async move {
1412                rt_copy.timeout(timeout).await;
1413
1414                //返回超时错误
1415                Err(Error::new(ErrorKind::TimedOut, format!("Time out")))
1416            })
1417        } else {
1418            //未设置超时时间
1419            Ok(())
1420        }
1421    }
1422
1423    /// 派发指定超时时间的指定任务到本地运行时,并返回派发是否成功
1424    pub fn spawn_local<O, F>(&self,
1425                             timeout: Option<usize>,
1426                             future: F) -> Result<()>
1427        where O: Default + 'static,
1428              F: Future<Output = Result<V>> + 'static {
1429        if let Some(rt) = local_serial_async_runtime::<O>() {
1430            //当前线程有绑定运行时
1431            self.0.spawn_local(future)?;
1432
1433            if let Some(timeout) = timeout {
1434                //设置了超时时间
1435                let rt_copy = rt.clone();
1436                self.0.spawn_local(async move {
1437                    rt_copy.timeout(timeout).await;
1438
1439                    //返回超时错误
1440                    Err(Error::new(ErrorKind::TimedOut, format!("Time out")))
1441                })
1442            } else {
1443                //未设置超时时间
1444                Ok(())
1445            }
1446        } else {
1447            //当前线程未绑定运行时
1448            Err(Error::new(ErrorKind::Other, format!("Spawn wait task failed, reason: local async runtime not exist")))
1449        }
1450    }
1451}
1452
1453/*
1454* 等待异步任务执行完成异步方法
1455*/
1456impl<V: 'static> AsyncWait<V> {
1457    /// 异步等待已派发任务的结果
1458    pub async fn wait_result(self) -> Result<V> {
1459        self.0.wait_result().await
1460    }
1461}
1462
1463///
1464/// 等待任意异步任务执行完成
1465///
1466pub struct AsyncWaitAny<V: 'static> {
1467    capacity:       usize,                      //派发任务的容量
1468    producor:       AsyncSender<Result<V>>,     //异步返回值生成器
1469    consumer:       AsyncReceiver<Result<V>>,   //异步返回值接收器
1470}
1471
1472unsafe impl<V: 'static> Send for AsyncWaitAny<V> {}
1473unsafe impl<V: 'static> Sync for AsyncWaitAny<V> {}
1474
1475/*
1476* 等待任意异步任务执行完成同步方法
1477*/
1478impl<V: 'static> AsyncWaitAny<V> {
1479    /// 构建等待任意异步任务执行完成
1480    pub(crate) fn new(capacity: usize,
1481                      producor: AsyncSender<Result<V>>,
1482                      consumer: AsyncReceiver<Result<V>>) -> Self {
1483        AsyncWaitAny {
1484            capacity,
1485            producor,
1486            consumer,
1487        }
1488    }
1489
1490    /// 派发指定任务到指定的运行时,并返回派发是否成功
1491    pub fn spawn<RT, O, F>(&self,
1492                           rt: RT,
1493                           future: F) -> Result<()>
1494        where RT: AsyncRuntime<O>,
1495              O: Default + 'static,
1496              F: Future<Output = Result<V>> + 'static {
1497        let producor = self.producor.clone();
1498        rt.spawn_by_id(rt.alloc::<O>(), async move {
1499                    let value = future.await;
1500                    producor.into_send_async(value).await;
1501
1502                    //返回异步任务的默认值
1503                    Default::default()
1504                })
1505    }
1506
1507    /// 派发指定任务到本地运行时,并返回派发是否成功
1508    pub fn spawn_local<F>(&self,
1509                          future: F) -> Result<()>
1510        where F: Future<Output = Result<V>> + 'static {
1511        if let Some(rt) = local_serial_async_runtime() {
1512            //本地线程有绑定运行时
1513            let producor = self.producor.clone();
1514            rt.spawn(async move {
1515                let value = future.await;
1516                producor.into_send_async(value).await;
1517            })
1518        } else {
1519            //本地线程未绑定运行时
1520            Err(Error::new(ErrorKind::Other, format!("Spawn wait any task failed, reason: local async runtime not exist")))
1521        }
1522    }
1523}
1524
1525/*
1526* 等待任意异步任务执行完成异步方法
1527*/
1528impl<V: 'static> AsyncWaitAny<V> {
1529    /// 异步等待任意已派发任务的结果
1530    pub async fn wait_result(self) -> Result<V> {
1531        match self.consumer.recv_async().await {
1532            Err(e) => {
1533                //接收错误,则立即返回
1534                Err(Error::new(ErrorKind::Other, format!("Wait any result failed, reason: {:?}", e)))
1535            },
1536            Ok(result) => {
1537                //接收成功,则立即返回
1538                result
1539            },
1540        }
1541    }
1542}
1543
1544///
1545/// 等待任意异步任务执行完成
1546///
1547pub struct AsyncWaitAnyCallback<V: 'static> {
1548    capacity:   usize,                      //派发任务的容量
1549    producor:   AsyncSender<Result<V>>,     //异步返回值生成器
1550    consumer:   AsyncReceiver<Result<V>>,   //异步返回值接收器
1551}
1552
1553unsafe impl<V: 'static> Send for AsyncWaitAnyCallback<V> {}
1554unsafe impl<V: 'static> Sync for AsyncWaitAnyCallback<V> {}
1555
1556/*
1557* 等待任意异步任务执行完成同步方法
1558*/
1559impl<V: 'static> AsyncWaitAnyCallback<V> {
1560    /// 构建等待任意异步任务执行完成
1561    pub(crate) fn new(capacity: usize,
1562                      producor: AsyncSender<Result<V>>,
1563                      consumer: AsyncReceiver<Result<V>>) -> Self {
1564        AsyncWaitAnyCallback {
1565            capacity,
1566            producor,
1567            consumer,
1568        }
1569    }
1570
1571    /// 派发指定任务到指定的运行时,并返回派发是否成功
1572    pub fn spawn<RT, O, F>(&self,
1573                           rt: RT,
1574                           future: F) -> Result<()>
1575        where RT: AsyncRuntime<O>,
1576              O: Default + 'static,
1577              F: Future<Output = Result<V>> + 'static {
1578        let producor = self.producor.clone();
1579        rt.spawn_by_id(rt.alloc::<O>(), async move {
1580                    let value = future.await;
1581                    producor.into_send_async(value).await;
1582
1583                    //返回异步任务的默认值
1584                    Default::default()
1585                })
1586    }
1587
1588    /// 派发指定任务到本地运行时,并返回派发是否成功
1589    pub fn spawn_local<F>(&self,
1590                          future: F) -> Result<()>
1591        where F: Future<Output = Result<V>> + 'static {
1592        if let Some(rt) = local_serial_async_runtime() {
1593            //当前线程有绑定运行时
1594            let producor = self.producor.clone();
1595            rt.spawn(async move {
1596                let value = future.await;
1597                producor.into_send_async(value).await;
1598            })
1599        } else {
1600            //当前线程未绑定运行时
1601            Err(Error::new(ErrorKind::Other, format!("Spawn wait any task failed by callback, reason: current async runtime not exist")))
1602        }
1603    }
1604}
1605
1606/*
1607* 等待任意异步任务执行完成异步方法
1608*/
1609impl<V: 'static> AsyncWaitAnyCallback<V> {
1610    /// 异步等待满足用户回调需求的已派发任务的结果
1611    pub async fn wait_result(mut self,
1612                             callback: impl Fn(&Result<V>) -> bool + 'static) -> Result<V> {
1613        let checker = create_checker(self.capacity, callback);
1614        loop {
1615            match self.consumer.recv_async().await {
1616                Err(e) => {
1617                    //接收错误,则立即返回
1618                    return Err(Error::new(ErrorKind::Other, format!("Wait any result failed by callback, reason: {:?}", e)));
1619                },
1620                Ok(result) => {
1621                    //接收成功,则检查是否立即返回
1622                    if checker(&result) {
1623                        //检查通过,则立即唤醒等待的任务,否则等待其它任务唤醒
1624                        return result;
1625                    }
1626                },
1627            }
1628        }
1629    }
1630}
1631
1632// 根据用户提供的回调,生成检查器
1633fn create_checker<V, F>(len: usize,
1634                        callback: F) -> Arc<dyn Fn(&Result<V>) -> bool + 'static>
1635    where V: 'static,
1636          F: Fn(&Result<V>) -> bool + 'static {
1637    let mut check_counter = AtomicUsize::new(len); //初始化检查计数器
1638    Arc::new(move |result| {
1639        if check_counter.fetch_sub(1, Ordering::SeqCst) == 1 {
1640            //最后一个任务的检查,则忽略用户回调,并立即返回成功
1641            true
1642        } else {
1643            //不是最后一个任务的检查,则调用用户回调,并根据用户回调确定是否成功
1644            callback(result)
1645        }
1646    })
1647}
1648
1649///
1650/// 异步映射归并
1651///
1652pub struct AsyncMapReduce<V: 'static> {
1653    count:          usize,                              //派发的任务数量
1654    capacity:       usize,                              //派发任务的容量
1655    producor:       AsyncSender<(usize, Result<V>)>,    //异步返回值生成器
1656    consumer:       AsyncReceiver<(usize, Result<V>)>,  //异步返回值接收器
1657}
1658
1659unsafe impl<V: 'static> Send for AsyncMapReduce<V> {}
1660
1661/*
1662* 异步映射归并同步方法
1663*/
1664impl<V: 'static> AsyncMapReduce<V> {
1665    /// 构建异步映射归并
1666    pub(crate) fn new(count: usize,
1667                      capacity: usize,
1668                      producor: AsyncSender<(usize, Result<V>)>,
1669                      consumer: AsyncReceiver<(usize, Result<V>)>) -> Self {
1670        AsyncMapReduce {
1671            count,
1672            capacity,
1673            producor,
1674            consumer,
1675        }
1676    }
1677
1678    /// 映射指定任务到指定的运行时,并返回任务序号
1679    pub fn map<RT, O, F>(&mut self, rt: RT, future: F) -> Result<usize>
1680        where RT: AsyncRuntime<O>,
1681              O: Default + 'static,
1682              F: Future<Output = Result<V>> + 'static {
1683        if self.count >= self.capacity {
1684            //已派发任务已达可派发任务的限制,则返回错误
1685            return Err(Error::new(ErrorKind::Other, format!("Map task to runtime failed, capacity: {}, reason: out of capacity", self.capacity)));
1686        }
1687
1688        let index = self.count;
1689        let producor = self.producor.clone();
1690        rt.spawn(async move {
1691                    let value = future.await;
1692                    producor.into_send_async((index, value)).await;
1693
1694                    //返回异步任务的默认值
1695                    Default::default()
1696                })?;
1697
1698        self.count += 1; //派发任务成功,则计数
1699        Ok(index)
1700    }
1701}
1702
1703/*
1704* 异步映射归并异步方法
1705*/
1706impl<V: 'static> AsyncMapReduce<V> {
1707    /// 归并所有派发的任务
1708    pub async fn reduce(self, order: bool) -> Result<Vec<Result<V>>> {
1709        let mut count = self.count;
1710        let mut results = Vec::with_capacity(count);
1711        while count > 0 {
1712            match self.consumer.recv_async().await {
1713                Err(e) => {
1714                    //接收错误,则立即返回
1715                    return Err(Error::new(ErrorKind::Other, format!("Reduce result failed, reason: {:?}", e)));
1716                },
1717                Ok((index, result)) => {
1718                    //接收成功,则继续
1719                    results.push((index, result));
1720                    count -= 1;
1721                },
1722            }
1723        }
1724
1725        if order {
1726            //需要对结果集进行排序
1727            results.sort_by_key(|(key, _value)| {
1728                key.clone()
1729            });
1730        }
1731        let (_, values) = results
1732            .into_iter()
1733            .unzip::<usize, Result<V>, Vec<usize>, Vec<Result<V>>>();
1734
1735        Ok(values)
1736    }
1737}
1738
1739///
1740/// 派发一个工作线程
1741/// 返回线程的句柄,可以通过句柄关闭线程
1742/// 线程在没有任务可以执行时会休眠,当派发任务或唤醒任务时会自动唤醒线程
1743///
1744pub fn spawn_worker_thread<F0, F1>(thread_name: &str,
1745                                   thread_stack_size: usize,
1746                                   thread_handler: Arc<AtomicBool>,
1747                                   thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>, //用于唤醒运行时所在线程的条件变量
1748                                   sleep_timeout: u64,                                  //休眠超时时长,单位毫秒
1749                                   loop_interval: Option<u64>,                          //工作者线程循环的间隔时长,None为无间隔,单位毫秒
1750                                   loop_func: F0,
1751                                   get_queue_len: F1) -> Arc<AtomicBool>
1752    where F0: Fn() -> (bool, Duration) + Send + 'static,
1753          F1: Fn() -> usize + Send + 'static {
1754    let thread_status_copy = thread_handler.clone();
1755
1756    thread::Builder::new()
1757        .name(thread_name.to_string())
1758        .stack_size(thread_stack_size)
1759        .spawn(move || {
1760            let mut sleep_count = 0;
1761
1762            while thread_handler.load(Ordering::Relaxed) {
1763                let (is_no_task, run_time) = loop_func();
1764
1765                if is_no_task {
1766                    //当前没有任务
1767                    if sleep_count > 1 {
1768                        //当前没有任务连续达到2次,则休眠线程
1769                        sleep_count = 0; //重置休眠计数
1770                        let (is_sleep, lock, condvar) = &*thread_waker;
1771                        let mut locked = lock.lock();
1772                        if get_queue_len() > 0 {
1773                            //当前有任务,则继续工作
1774                            continue;
1775                        }
1776
1777                        if !is_sleep.load(Ordering::Relaxed) {
1778                            //如果当前未休眠,则休眠
1779                            is_sleep.store(true, Ordering::SeqCst);
1780                            if condvar
1781                                .wait_for(
1782                                    &mut locked,
1783                                    Duration::from_millis(sleep_timeout),
1784                                )
1785                                .timed_out()
1786                            {
1787                                //条件超时唤醒,则设置状态为未休眠
1788                                is_sleep.store(false, Ordering::SeqCst);
1789                            }
1790                        }
1791
1792                        continue; //唤醒后立即尝试执行任务
1793                    }
1794
1795                    sleep_count += 1; //休眠计数
1796                    if let Some(interval) = &loop_interval {
1797                        //设置了循环间隔时长
1798                        if let Some(remaining_interval) = Duration::from_millis(*interval).checked_sub(run_time){
1799                            //本次运行少于循环间隔,则休眠剩余的循环间隔,并继续执行任务
1800                            thread::sleep(remaining_interval);
1801                        }
1802                    }
1803                } else {
1804                    //当前有任务
1805                    sleep_count = 0; //重置休眠计数
1806                    if let Some(interval) = &loop_interval {
1807                        //设置了循环间隔时长
1808                        if let Some(remaining_interval) = Duration::from_millis(*interval).checked_sub(run_time){
1809                            //本次运行少于循环间隔,则休眠剩余的循环间隔,并继续执行任务
1810                            thread::sleep(remaining_interval);
1811                        }
1812                    }
1813                }
1814            }
1815    });
1816
1817    thread_status_copy
1818}
1819
1820/// 唤醒工作者所在线程,如果线程当前正在运行,则忽略
1821pub fn wakeup_worker_thread<O, P>(worker_waker: &Arc<(AtomicBool, Mutex<()>, Condvar)>,
1822                                  rt: &SingleTaskRuntime<O, P>)
1823    where O: Default + 'static,
1824          P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P> {
1825    //检查工作者所在线程是否需要唤醒
1826    if worker_waker.0.load(Ordering::Relaxed) && rt.len() > 0 {
1827        let (is_sleep, lock, condvar) = &**worker_waker;
1828        let locked = lock.lock();
1829        is_sleep.store(false, Ordering::SeqCst); //设置为未休眠
1830        let _ = condvar.notify_one();
1831    }
1832}