pi_async_rt/rt/
multi_thread.rs

1//! # 多线程运行时
2//!
3//! - [ComputationalTaskPool]\: 计算型的多线程任务池,适合用于Cpu密集型的应用,
4//!   不支持运行时伸缩
5//! - [StealableTaskPool]\:
6//!   可窃取的多线程任务池,适合用于block较多的应用,支持运行时伸缩
7//! - [MultiTaskRuntime]\: 异步多线程任务运行时,支持运行时线程伸缩
8//! - [MultiTaskRuntimeBuilder]\: 异步多线程任务运行时构建器
9//!
10//! [ComputationalTaskPool]: struct.ComputationalTaskPool.html
11//! [StealableTaskPool]: struct.StealableTaskPool.html
12//! [MultiTaskRuntime]: struct.MultiTaskRuntime.html
13//! [MultiTaskRuntimeBuilder]: struct.MultiTaskRuntimeBuilder.html
14//!
15//! # Examples
16//!
17//! ```
18//! use pi_async::prelude::{MultiTaskRuntime, MultiTaskRuntimeBuilder, StealableTaskPool};
19//! use pi_async::rt::AsyncRuntimeExt;
20//!
21//! let pool = StealableTaskPool::with(4,100000,[1, 254],3000);
22//! let builer = MultiTaskRuntimeBuilder::new(pool)
23//!     .set_timer_interval(1)
24//!     .init_worker_size(4)
25//!     .set_worker_limit(4, 4);
26//! let rt = builer.build();
27//! let _ = rt.spawn(async move {});
28//! ```
29
30use std::sync::Arc;
31use std::vec::IntoIter;
32use std::time::Duration;
33use std::future::Future;
34use std::cell::UnsafeCell;
35use std::marker::PhantomData;
36use std::io::{Error, ErrorKind, Result};
37use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
38use std::task::{Context, Poll, Waker};
39use std::thread::{self, Builder};
40
41use async_stream::stream;
42use crossbeam_channel::{bounded, Sender};
43use crossbeam_deque::{Injector, Steal, Stealer, Worker};
44use crossbeam_queue::{ArrayQueue, SegQueue};
45use st3::{StealError,
46          fifo::{Worker as FIFOWorker, Stealer as FIFOStealer}};
47use flume::bounded as async_bounded;
48use futures::{
49    future::{BoxFuture, FutureExt},
50    stream::{BoxStream, Stream, StreamExt},
51    task::waker_ref,
52    TryFuture,
53};
54use parking_lot::{Condvar, Mutex};
55use rand::{Rng, thread_rng};
56use num_cpus;
57use wrr::IWRRSelector;
58use quanta::{Clock, Instant as QInstant};
59use log::warn;
60
61use super::{
62    PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME, PI_ASYNC_THREAD_LOCAL_ID, DEFAULT_MAX_HIGH_PRIORITY_BOUNDED, DEFAULT_HIGH_PRIORITY_BOUNDED, DEFAULT_MAX_LOW_PRIORITY_BOUNDED, alloc_rt_uid, local_async_runtime, AsyncMapReduce, AsyncPipelineResult, AsyncRuntime,
63    AsyncRuntimeExt, AsyncTask, AsyncTaskPool, AsyncTaskPoolExt, AsyncTaskTimerByNotCancel, AsyncTimingTask,
64    AsyncWait, AsyncWaitAny, AsyncWaitAnyCallback, AsyncWaitTimeout, LocalAsyncWaitTimeout, LocalAsyncRuntime, TaskId, TaskHandle, YieldNow
65};
66
67/*
68* 默认的初始工作者数量
69*/
70#[cfg(not(target_arch = "wasm32"))]
71const DEFAULT_INIT_WORKER_SIZE: usize = 2;
72#[cfg(target_arch = "wasm32")]
73const DEFAULT_INIT_WORKER_SIZE: usize = 1;
74
75/*
76* 默认的工作者线程名称前缀
77*/
78const DEFAULT_WORKER_THREAD_PREFIX: &str = "Default-Multi-RT";
79
80/*
81* 默认的线程栈大小
82*/
83const DEFAULT_THREAD_STACK_SIZE: usize = 1024 * 1024;
84
85/*
86* 默认的工作者线程空闲休眠时长,单位ms
87*/
88const DEFAULT_WORKER_THREAD_SLEEP_TIME: u64 = 10;
89
90/*
91* 默认的运行时空闲休眠时长,单位ms,运行时空闲是指绑定当前运行时的队列为空,且定时器内未到期的任务为空
92*/
93const DEFAULT_RUNTIME_SLEEP_TIME: u64 = 1000;
94
95/*
96* 默认的最大权重
97*/
98const DEFAULT_MAX_WEIGHT: u8 = 254;
99
100/*
101* 默认的最小权重
102*/
103const DEFAULT_MIN_WEIGHT: u8 = 1;
104
105///
106/// 计算型的工作者任务队列
107///
108struct ComputationalTaskQueue<O: Default + 'static> {
109    stack: Worker<Arc<AsyncTask<ComputationalTaskPool<O>, O>>>,     //工作者任务栈
110    queue: SegQueue<Arc<AsyncTask<ComputationalTaskPool<O>, O>>>,   //工作者任务队列
111    thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>,            //工作者线程的唤醒器
112}
113
114impl<O: Default + 'static> ComputationalTaskQueue<O> {
115    //构建计算型的工作者任务队列
116    pub fn new(thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) -> Self {
117        let stack = Worker::new_lifo();
118        let queue = SegQueue::new();
119
120        ComputationalTaskQueue {
121            stack,
122            queue,
123            thread_waker,
124        }
125    }
126
127    //获取计算型的工作者任务队列的任务数量
128    pub fn len(&self) -> usize {
129        self.stack.len() + self.queue.len()
130    }
131}
132
133///
134/// 计算型的多线程任务池,适合用于Cpu密集型的应用,不支持运行时伸缩
135///
136pub struct ComputationalTaskPool<O: Default + 'static> {
137    workers: Vec<ComputationalTaskQueue<O>>, //工作者的任务队列列表
138    waits: Option<Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>>, //待唤醒的工作者唤醒器队列
139    consume_count: Arc<AtomicUsize>,                                       //任务消费计数
140    produce_count: Arc<AtomicUsize>,                                       //任务生产计数
141}
142
143unsafe impl<O: Default + 'static> Send for ComputationalTaskPool<O> {}
144unsafe impl<O: Default + 'static> Sync for ComputationalTaskPool<O> {}
145
146impl<O: Default + 'static> Default for ComputationalTaskPool<O> {
147    fn default() -> Self {
148        #[cfg(not(target_arch = "wasm32"))]
149        let core_len = num_cpus::get(); //工作者任务池数据等于本机逻辑核数
150        #[cfg(target_arch = "wasm32")]
151        let core_len = 1; //工作者任务池数据等于1
152        ComputationalTaskPool::new(core_len)
153    }
154}
155
156impl<O: Default + 'static> AsyncTaskPool<O> for ComputationalTaskPool<O> {
157    type Pool = ComputationalTaskPool<O>;
158
159    #[inline]
160    fn get_thread_id(&self) -> usize {
161        match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| unsafe { *thread_id.get() }) {
162            Err(e) => {
163                //不应该执行到这个分支
164                panic!(
165                    "Get thread id failed, thread: {:?}, reason: {:?}",
166                    thread::current(),
167                    e
168                );
169            }
170            Ok(id) => id,
171        }
172    }
173
174    #[inline]
175    fn len(&self) -> usize {
176        if let Some(len) = self
177            .produce_count
178            .load(Ordering::Relaxed)
179            .checked_sub(self.consume_count.load(Ordering::Relaxed))
180        {
181            len
182        } else {
183            0
184        }
185    }
186
187    #[inline]
188    fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
189        let index = self.produce_count.fetch_add(1, Ordering::Relaxed) % self.workers.len();
190        self.workers[index].queue.push(task);
191        Ok(())
192    }
193
194    #[inline]
195    fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
196        let id = self.get_thread_id();
197        let rt_uid = task.owner();
198        if (id >> 32) == rt_uid {
199            //当前是运行时所在线程
200            let worker = &self.workers[id & 0xffffffff];
201            worker.queue.push(task);
202
203            self.produce_count.fetch_add(1, Ordering::Relaxed);
204            Ok(())
205        } else {
206            //当前不是运行时所在线程
207            self.push(task)
208        }
209    }
210
211    #[inline]
212    fn push_priority(&self,
213                     priority: usize,
214                     task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
215        if priority >= DEFAULT_MAX_HIGH_PRIORITY_BOUNDED {
216            //最高优先级
217            let id = self.get_thread_id();
218            let rt_uid = task.owner();
219            if (id >> 32) == rt_uid {
220                let worker = &self.workers[id & 0xffffffff];
221                worker.stack.push(task);
222
223                self.produce_count.fetch_add(1, Ordering::Relaxed);
224                Ok(())
225            } else {
226                self.push(task)
227            }
228        } else if priority >= DEFAULT_HIGH_PRIORITY_BOUNDED {
229            //高优先级
230            self.push_local(task)
231        } else {
232            //低优先级
233            self.push(task)
234        }
235    }
236
237    #[inline]
238    fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
239        self.push_priority(DEFAULT_HIGH_PRIORITY_BOUNDED, task)
240    }
241
242    #[inline]
243    fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
244        let id = self.get_thread_id() & 0xffffffff;
245        let worker = &self.workers[id];
246        let task = worker.stack.pop();
247        if task.is_some() {
248            //指定工作者的任务栈有任务,则立即返回任务
249            self.consume_count.fetch_add(1, Ordering::Relaxed);
250            return task;
251        }
252
253        let task = worker.queue.pop();
254        if task.is_some() {
255            self.consume_count.fetch_add(1, Ordering::Relaxed);
256        }
257
258        task
259    }
260
261    #[inline]
262    fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
263        let mut tasks = Vec::with_capacity(self.len());
264        while let Some(task) = self.try_pop() {
265            tasks.push(task);
266        }
267
268        tasks.into_iter()
269    }
270
271    #[inline]
272    fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
273        //多线程任务运行时不支持此方法
274        None
275    }
276}
277
278impl<O: Default + 'static> AsyncTaskPoolExt<O> for ComputationalTaskPool<O> {
279    #[inline]
280    fn set_waits(&mut self, waits: Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>) {
281        self.waits = Some(waits);
282    }
283
284    #[inline]
285    fn get_waits(&self) -> Option<&Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>> {
286        self.waits.as_ref()
287    }
288
289    #[inline]
290    fn worker_len(&self) -> usize {
291        self.workers.len()
292    }
293
294    #[inline]
295    fn clone_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
296        let worker = &self.workers[self.get_thread_id() & 0xffffffff];
297        Some(worker.thread_waker.clone())
298    }
299}
300
301impl<O: Default + 'static> ComputationalTaskPool<O> {
302    //构建指定数量的工作者的计算型的多线程任务池
303    pub fn new(mut size: usize) -> Self {
304        if size < DEFAULT_INIT_WORKER_SIZE {
305            //工作者数量过少,则设置为默认的工作者数量
306            size = DEFAULT_INIT_WORKER_SIZE;
307        }
308
309        let mut workers = Vec::with_capacity(size);
310        for _ in 0..size {
311            let thread_waker = Arc::new((AtomicBool::new(false), Mutex::new(()), Condvar::new()));
312            let worker = ComputationalTaskQueue::new(thread_waker);
313            workers.push(worker);
314        }
315        let consume_count = Arc::new(AtomicUsize::new(0));
316        let produce_count = Arc::new(AtomicUsize::new(0));
317
318        ComputationalTaskPool {
319            workers,
320            waits: None,
321            consume_count,
322            produce_count,
323        }
324    }
325}
326
327///
328/// 可窃取的混合任务队列
329///
330struct StealableTaskQueue<O: Default + 'static> {
331    stack:          UnsafeCell<Option<Arc<AsyncTask<StealableTaskPool<O>, O>>>>,    //工作者任务栈
332    internal:       FIFOWorker<Arc<AsyncTask<StealableTaskPool<O>, O>>>,            //工作者本地内部任务队列,可窃取
333    external:       Worker<Arc<AsyncTask<StealableTaskPool<O>, O>>>,                //工作者本地外部任务队列,可窃取
334    selector:       UnsafeCell<IWRRSelector<2>>,                                    //工作者任务队列选择器
335    thread_waker:   Arc<(AtomicBool, Mutex<()>, Condvar)>,                          //工作者线程的唤醒器
336}
337
338impl<O: Default + 'static> StealableTaskQueue<O> {
339    // 构建可窃取的混合任务队列,允许设置初始的栈和队列的初始容量,并自动设置栈和队列的容量
340    // 栈和队列的容量是初始容量的最小二次方,例如初始容量为0,则容量为1
341    pub fn new(
342        init_queue_capacity: usize,
343        thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>,
344    ) -> (Self,
345          FIFOStealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>,
346          Stealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>) {
347        let stack = UnsafeCell::new(None);
348        let internal = FIFOWorker::new(init_queue_capacity);
349        let external = Worker::new_fifo();
350        let internal_stealer = internal.stealer();
351        let external_stealer = external.stealer();
352        let selector = UnsafeCell::new(IWRRSelector::new([2, 1]));
353
354        (
355            StealableTaskQueue {
356                stack,
357                internal,
358                external,
359                selector,
360                thread_waker,
361            },
362            internal_stealer,
363            external_stealer
364        )
365    }
366
367    // 获取栈容量
368    pub const fn stack_capacity(&self) -> usize {
369        1
370    }
371
372    // 获取本地内部任务队列容量
373    pub fn internal_capacity(&self) -> usize {
374        self.internal.capacity()
375    }
376
377    // 获取剩余的本地内部任务队列容量,不准确
378    pub fn remaining_internal_capacity(&self) -> usize {
379        self.internal.spare_capacity()
380    }
381
382    // 获取栈的长度
383    #[inline]
384    pub fn stack_len(&self) -> usize {
385        unsafe {
386            if (&*self.stack.get()).is_some() {
387                1
388            } else {
389                0
390            }
391        }
392    }
393
394    // 获取本地内部任务队列长度
395    pub fn internal_len(&self) -> usize {
396        self
397            .internal_capacity()
398            .checked_sub(self.remaining_internal_capacity())
399            .unwrap_or(0)
400    }
401
402    // 获取本地外部任务队列长度
403    pub fn external_len(&self) -> usize {
404        self.external.len()
405    }
406}
407
408///
409/// 可窃取的混合任务池
410///
411pub struct StealableTaskPool<O: Default + 'static> {
412    public:                         Injector<Arc<AsyncTask<StealableTaskPool<O>, O>>>,          //公共的任务池
413    workers:                        Vec<StealableTaskQueue<O>>,                                 //工作者的任务队列列表
414    internal_stealers:              Vec<FIFOStealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>>,  //工作者任务队列的本地内部任务窃取者
415    external_stealers:              Vec<Stealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>>,      //工作者任务队列的本地外部任务窃取者
416    internal_consume:               AtomicUsize,                                                //内部任务消费计数
417    internal_produce:               AtomicUsize,                                                //内部任务生产计数
418    internal_traffic_statistics:    AtomicUsize,                                                //内部任务流量统计
419    external_consume:               AtomicUsize,                                                //外部任务消费计数
420    external_produce:               AtomicUsize,                                                //外部任务生产计数
421    external_traffic_statistics:    AtomicUsize,                                                //外部任务流量统计
422    weights:                        [u8; 2],                                                    //工作者任务队列的权重
423    clock:                          Clock,                                                      //任务池的时钟
424    interval:                       usize,                                                      //整理的间隔时长,单位ms
425    last_time:                      UnsafeCell<QInstant>,                                       //上一次整理的时间
426}
427
428unsafe impl<O: Default + 'static> Send for StealableTaskPool<O> {}
429unsafe impl<O: Default + 'static> Sync for StealableTaskPool<O> {}
430
431impl<O: Default + 'static> Default for StealableTaskPool<O> {
432    fn default() -> Self {
433        StealableTaskPool::new()
434    }
435}
436
437impl<O: Default + 'static> AsyncTaskPool<O> for StealableTaskPool<O> {
438    type Pool = StealableTaskPool<O>;
439
440    #[inline]
441    fn get_thread_id(&self) -> usize {
442        match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| unsafe { *thread_id.get() }) {
443            Err(e) => {
444                //不应该执行到这个分支
445                panic!(
446                    "Get thread id failed, thread: {:?}, reason: {:?}",
447                    thread::current(),
448                    e
449                );
450            }
451            Ok(id) => id,
452        }
453    }
454
455    #[inline]
456    fn len(&self) -> usize {
457        self.internal_produce
458            .load(Ordering::Relaxed)
459            .checked_sub(self.internal_consume.load(Ordering::Relaxed))
460            .unwrap_or(0)
461            +
462            self.external_produce
463                .load(Ordering::Relaxed)
464                .checked_sub(self.external_consume.load(Ordering::Relaxed))
465                .unwrap_or(0)
466    }
467
468    #[inline]
469    fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
470        self.public.push(task);
471
472        self
473            .external_produce
474            .fetch_add(1, Ordering::Relaxed);
475        Ok(())
476    }
477
478    #[inline]
479    fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
480        let id = self.get_thread_id();
481        let rt_uid = task.owner();
482        if (id >> 32) == rt_uid {
483            //当前是运行时所在线程
484            let worker = &self.workers[id & 0xffffffff];
485            if worker.remaining_internal_capacity() > 0 {
486                //本地内部任务队列有空闲容量,则立即将任务加入本地内部任务队列
487                let _ = worker.internal.push(task);
488
489                self
490                    .internal_produce
491                    .fetch_add(1, Ordering::Relaxed);
492                Ok(())
493            } else {
494                //本地内部任务队列没有空闲容量,则立即将任务加入公共任务池
495                self.push(task)
496            }
497        } else {
498            //当前不是运行时所在线程
499            self.push(task)
500        }
501    }
502
503    #[inline]
504    fn push_priority(&self,
505                     priority: usize,
506                     task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
507        if priority >= DEFAULT_MAX_HIGH_PRIORITY_BOUNDED {
508            //最高优先级
509            let id = self.get_thread_id();
510            let rt_uid = task.owner();
511            if (id >> 32) == rt_uid {
512                //当前是运行时所在线程
513                let worker = &self.workers[id & 0xffffffff];
514                if worker.stack_len() < 1 {
515                    //本地任务栈有空闲容量,则立即将任务加入本地任务栈
516                    unsafe {
517                        *worker.stack.get() = Some(task);
518                    }
519                } else if worker.remaining_internal_capacity() > 0 {
520                    //本地内部任务队列有空闲容量,则立即将任务加入本地内部任务队列
521                    let _ = worker.internal.push(task);
522                } else {
523                    //本地任务栈和本地内部任务队列都没有空闲容量,则立即将任务加入公共任务池
524                    return self.push(task);
525                }
526
527                self
528                    .internal_produce
529                    .fetch_add(1, Ordering::Relaxed);
530                Ok(())
531            } else {
532                //当前不是运行时所在线程
533                self.push(task)
534            }
535        } else if priority >= DEFAULT_HIGH_PRIORITY_BOUNDED {
536            //高优先级
537            self.push_local(task)
538        } else {
539            //低优先级
540            self.push(task)
541        }
542    }
543
544    #[inline]
545    fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
546        self.push_priority(DEFAULT_HIGH_PRIORITY_BOUNDED, task)
547    }
548
549    #[inline]
550    fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
551        let id = self.get_thread_id() & 0xffffffff;
552        let worker = &self.workers[id];
553        let task = unsafe { (&mut *worker
554            .stack
555            .get())
556            .take()
557        };
558        if task.is_some() {
559            //指定工作者的任务栈有任务,则立即返回任务
560            return task;
561        }
562
563        //从指定工作者的任务队列中弹出任务
564        try_pop_by_weight(self, worker, id)
565    }
566
567    #[inline]
568    fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
569        let mut tasks = Vec::with_capacity(self.len());
570        while let Some(task) = self.try_pop() {
571            tasks.push(task);
572        }
573
574        tasks.into_iter()
575    }
576
577    #[inline]
578    fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
579        //多线程任务运行时不支持此方法
580        None
581    }
582}
583
584// 获取指定数字的MSB
585const fn get_msb(n: usize) -> usize {
586    usize::BITS as usize - n.leading_zeros() as usize
587}
588
589// 尝试通过统计信息更新权重,根据权重选择从本地外部任务队列或本地内部任务队列中弹出任务
590fn try_pop_by_weight<O: Default + 'static>(pool: &StealableTaskPool<O>,
591                                           local_worker: &StealableTaskQueue<O>,
592                                           local_worker_id: usize)
593                                           -> Option<Arc<AsyncTask<StealableTaskPool<O>, O>>> {
594    unsafe {
595        let duration = pool
596            .clock
597            .recent()
598            .duration_since(*pool.last_time.get())
599            .as_millis() as usize;
600        if duration >= pool.interval {
601            //开始整理外部任务队列和内部任务队列的任务数量,并更新权重
602            let new_external_traffic_statistics = pool
603                .external_produce
604                .load(Ordering::Relaxed);
605            let new_internal_traffic_statistics = pool
606                .internal_produce
607                .load(Ordering::Relaxed);
608
609            //获取外部任务增量和内部任务增量
610            let external_delta = if new_external_traffic_statistics == 0 {
611                //上次整理到本次整理之间,外部任务数量为空,则增量为1
612                1
613            } else {
614                //上次整理到本次整理之间,外部任务数量不为空,则计算两次整理之间的外部任务数量的增量
615                new_external_traffic_statistics
616                    .checked_sub(pool
617                        .external_traffic_statistics
618                        .load(Ordering::Relaxed))
619                    .unwrap_or(1)
620            };
621            pool
622                .external_traffic_statistics
623                .store(new_external_traffic_statistics, Ordering::Relaxed); //更新外部任务流量统计
624            let internal_delta = if new_internal_traffic_statistics == 0 {
625                //上次整理到本次整理之间,内部任务数量为空,则增量为1
626                1
627            } else {
628                //上次整理到本次整理之间,内部任务数量不为空,则计算两次整理之间的内部任务数量的增量
629                new_internal_traffic_statistics
630                    .checked_sub(pool
631                        .internal_traffic_statistics
632                        .load(Ordering::Relaxed))
633                    .unwrap_or(1)
634            };
635            pool
636                .internal_traffic_statistics
637                .store(new_internal_traffic_statistics, Ordering::Relaxed); //更新内部任务流量统计
638
639            //更新外部任务队列和内部任务队列的权重
640            let selector = &mut *local_worker.selector.get();
641            if external_delta > internal_delta {
642                //内部任务增量较小
643                let msb = get_msb(internal_delta);
644                let internal_weight
645                    = (internal_delta >> msb.checked_sub(2).unwrap_or(0)).max(1);
646                let external_weight
647                    = ((external_delta >> msb).min(DEFAULT_MAX_WEIGHT as usize)).max(1);
648
649                selector.change_weight(0, external_weight as u8);
650                selector.change_weight(1, internal_weight as u8);
651            } else if external_delta < internal_delta {
652                //外部任务增量较小
653                let msb = get_msb(external_delta);
654                let external_weight
655                    = (external_delta >> msb.checked_sub(2).unwrap_or(0)).max(1);
656                let internal_weight
657                    = ((internal_delta >> msb).min(DEFAULT_MAX_WEIGHT as usize)).max(1);
658
659                selector.change_weight(0, external_weight as u8);
660                selector.change_weight(1, internal_weight as u8);
661            } else {
662                //外部任务和内部任务增量相同
663                selector.change_weight(0, 1);
664                selector.change_weight(1, 1);
665            }
666
667            *pool.last_time.get() = pool.clock.recent(); //更新上一次整理的时间
668        }
669
670        //根据权重选择从指定的任务队列弹出任务
671        match (&mut *local_worker.selector.get()).select() {
672            0 => {
673                //弹出外部任务
674                let task = try_pop_external(pool, local_worker, local_worker_id);
675                if task.is_some() {
676                    task
677                } else {
678                    //当前没有外部任务,则尝试弹出内部任务
679                    try_pop_internal(pool, local_worker, local_worker_id)
680                }
681            },
682            _ => {
683                //弹出内部任务
684                let task = try_pop_internal(pool, local_worker, local_worker_id);
685                if task.is_some() {
686                    task
687                } else {
688                    //当前没有内部任务,则尝试弹出外部任务
689                    try_pop_external(pool, local_worker, local_worker_id)
690                }
691            },
692        }
693    }
694}
695
696// 尝试弹出内部任务队列的任务
697#[inline]
698fn try_pop_internal<O: Default + 'static>(pool: &StealableTaskPool<O>,
699                                          local_worker: &StealableTaskQueue<O>,
700                                          local_worker_id: usize)
701    -> Option<Arc<AsyncTask<StealableTaskPool<O>, O>>> {
702    let task = local_worker
703        .internal
704        .pop();
705    if task.is_some() {
706        //如果工作者有内部任务,则立即返回
707        pool
708            .internal_consume
709            .fetch_add(1, Ordering::Relaxed);
710        task
711    } else {
712        //工作者的内部任务队列为空,则随机从其它工作者的内部任务队列中窃取任务
713        let mut gen = thread_rng();
714        let mut worker_stealers: Vec<&FIFOStealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>> = pool
715            .internal_stealers
716            .iter()
717            .enumerate()
718            .filter_map(|(index, other)| {
719                if index != local_worker_id {
720                    Some(other)
721                } else {
722                    //忽略本地工作者
723                    None
724                }
725            })
726            .collect();
727
728        let remaining_len = local_worker.remaining_internal_capacity();
729        loop {
730            //随机窃取其它工作者的任务队列
731            if worker_stealers.len() == 0 {
732                //所有其它工作者的任务队列都为空,则返回空
733                break;
734            }
735
736            let index = gen.gen_range(0..worker_stealers.len());
737            let worker_stealer = worker_stealers.swap_remove(index);
738
739            match worker_stealer.steal_and_pop(&local_worker.internal,
740                                               |count| {
741                                                   let stealable_len = count / 2;
742                                                   if stealable_len <= remaining_len {
743                                                       //当前工作者内部任务队列的剩余容量足够,则窃取指定的其它工作者的内部任务队列中一半的任务
744                                                       if stealable_len == 0 {
745                                                           1
746                                                       } else {
747                                                           stealable_len
748                                                       }
749                                                   } else {
750                                                       //当前工作者内部任务队列的剩余容量不足够,则从指定的其它工作者的内部任务队列中窃取当前工作者内部任务队列剩余容量的任务
751                                                       remaining_len
752                                                   }
753                                               }) {
754                Err(StealError::Empty) => {
755                    //指定的其它工作者的内部任务队列中没有可窃取的任务,则继续窃取下一个其它工作者的内部任务队列
756                    continue;
757                },
758                Err(StealError::Busy) => {
759                    //需要重试窃取指定的其它工作者的内部任务队列中的任务
760                    continue;
761                },
762                Ok((task, _)) => {
763                    //从从已窃取到的其它工作者内部任务中获取到首个任务,并立即返回
764                    pool.internal_consume.fetch_add(1, Ordering::Relaxed);
765                    return Some(task);
766                },
767            }
768        }
769
770        None
771    }
772}
773
774// 尝试弹出外部任务队列的任务
775#[inline]
776fn try_pop_external<O: Default + 'static>(pool: &StealableTaskPool<O>,
777                                          local_worker: &StealableTaskQueue<O>,
778                                          local_worker_id: usize)
779    -> Option<Arc<AsyncTask<StealableTaskPool<O>, O>>> {
780    let task = local_worker
781        .external
782        .pop();
783    if task.is_some() {
784        //如果工作者有外部任务,则立即返回
785        pool
786            .external_consume
787            .fetch_add(1, Ordering::Relaxed);
788        task
789    } else {
790        //工作者的外部任务队列为空,则从公共任务池中弹出任务
791        let task = try_pop_public(pool, local_worker);
792        if task.is_some() {
793            //如果公共任务池有外部任务,则立即返回
794            pool
795                .external_consume
796                .fetch_add(1, Ordering::Relaxed);
797            task
798        } else {
799            //公共任务池为空,则随机从其它工作者的外部任务队列中窃取任务
800            let mut gen = thread_rng();
801            let mut worker_stealers: Vec<&Stealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>> = pool
802                .external_stealers
803                .iter()
804                .enumerate()
805                .filter_map(|(index, other)| {
806                    if index != local_worker_id {
807                        Some(other)
808                    } else {
809                        //忽略当前工作者
810                        None
811                    }
812                })
813                .collect();
814
815            loop {
816                //随机窃取其它工作者的任务队列
817                if worker_stealers.len() == 0 {
818                    //所有其它工作者的外部任务队列都为空,则返回空
819                    break;
820                }
821
822                let index = gen.gen_range(0..worker_stealers.len());
823                let worker_stealer = worker_stealers.swap_remove(index);
824
825                match worker_stealer.steal_batch_and_pop(&local_worker.external) {
826                    Steal::Success(task) => {
827                        //从从已窃取到的其它工作者外部任务中获取到首个任务,并立即返回
828                        pool.external_consume.fetch_add(1, Ordering::Relaxed);
829                        return Some(task);
830                    },
831                    Steal::Retry => {
832                        //需要重试窃取指定的其它工作者的外部任务队列中的任务
833                        continue;
834                    },
835                    Steal::Empty => {
836                        //指定的其它工作者的外部任务队列中没有可窃取的任务,则继续窃取下一个其它工作者的外部任务队列
837                        continue;
838                    },
839                }
840            }
841
842            None
843        }
844    }
845}
846
847// 尝试弹出公共任务池的任务
848#[inline]
849fn try_pop_public<O: Default + 'static>(pool: &StealableTaskPool<O>,
850                                        local_worker: &StealableTaskQueue<O>)
851    -> Option<Arc<AsyncTask<StealableTaskPool<O>, O>>> {
852    loop {
853        match pool.public.steal_batch_and_pop(&local_worker.external) {
854            Steal::Empty => {
855                //当前公共任务池没有任务
856                return None;
857            },
858            Steal::Retry => {
859                //需要重试窃取公共任务池的任务
860                continue;
861            },
862            Steal::Success(task) => {
863                //从已窃取到的公共任务中获取到首个任务,并立即返回
864                pool.external_consume.fetch_add(1, Ordering::Relaxed);
865                return Some(task);
866            },
867        }
868    }
869}
870
871impl<O: Default + 'static> AsyncTaskPoolExt<O> for StealableTaskPool<O> {
872    #[inline]
873    fn worker_len(&self) -> usize {
874        self.workers.len()
875    }
876
877    #[inline]
878    fn clone_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
879        if let Some(worker) = self.workers.get(self.get_thread_id() & 0xffffffff) {
880            return Some(worker.thread_waker.clone());
881        }
882
883        None
884    }
885}
886
887impl<O: Default + 'static> StealableTaskPool<O> {
888    /// 可窃取的快速工作者任务池
889    pub fn new() -> Self {
890        #[cfg(not(target_arch = "wasm32"))]
891            let size = num_cpus::get_physical() * 2; //默认最大工作者任务池数量是当前cpu物理核的2倍
892        #[cfg(target_arch = "wasm32")]
893            let size = 1; //默认最大工作者任务池数量是1
894        StealableTaskPool::with(size,
895                                0x8000,
896                                [1, 1],
897                                3000)
898    }
899
900    /// 构建指定工作者任务池数量,工作者内部任务队列容量,工作者任务栈容量,任务队列的权重和整理间隔时长的可窃取的快速工作者任务池
901    pub fn with(worker_size: usize,
902                internal_queue_capacity: usize,
903                weights: [u8; 2],
904                interval: usize) -> Self {
905        if worker_size == 0 {
906            //工作者任务池数量无效,则立即抛出异常
907            panic!(
908                "Create WorkerTaskPool failed, worker size: {}, reason: invalid worker size",
909                worker_size
910            );
911        }
912        if interval == 0 {
913            panic!(
914                "Create WorkerTaskPool failed, interval: {}, reason: invalid interval",
915                worker_size
916            );
917        }
918
919        let public = Injector::new();
920        let mut workers = Vec::with_capacity(worker_size);
921        let mut internal_stealers = Vec::with_capacity(worker_size);
922        let mut external_stealers = Vec::with_capacity(worker_size);
923        for _ in 0..worker_size {
924            //初始化指定初始作者任务池数量的工作者任务池和窃取者
925            let thread_waker = Arc::new((AtomicBool::new(false), Mutex::new(()), Condvar::new()));
926            let (worker,
927                internal_stealer,
928                external_stealer) =
929                StealableTaskQueue::new(internal_queue_capacity,
930                                        thread_waker);
931            workers.push(worker);
932            internal_stealers.push(internal_stealer);
933            external_stealers.push(external_stealer);
934        }
935        let internal_consume = AtomicUsize::new(0);
936        let internal_produce = AtomicUsize::new(0);
937        let internal_traffic_statistics = AtomicUsize::new(0);
938        let external_consume = AtomicUsize::new(0);
939        let external_produce = AtomicUsize::new(0);
940        let external_traffic_statistics = AtomicUsize::new(0);
941        let clock = Clock::new();
942        let last_time = UnsafeCell::new(clock.recent());
943
944        StealableTaskPool {
945            public,
946            workers,
947            internal_stealers,
948            external_stealers,
949            internal_consume,
950            internal_produce,
951            internal_traffic_statistics,
952            external_consume,
953            external_produce,
954            external_traffic_statistics,
955            weights,
956            clock,
957            interval,
958            last_time,
959        }
960    }
961}
962
963///
964/// 异步多线程任务运行时,支持运行时线程伸缩
965///
966pub struct MultiTaskRuntime<
967    O: Default + 'static = (),
968    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = StealableTaskPool<O>,
969>(
970    Arc<(
971        usize,                                                  //运行时唯一id
972        Arc<P>,                                                 //异步任务池
973        Option<
974            Vec<(
975                Sender<(usize, AsyncTimingTask<P, O>)>,
976                Arc<AsyncTaskTimerByNotCancel<P, O>>,
977            )>,
978        >,                                                      //休眠的异步任务生产者和本地定时器
979        AtomicUsize,                                            //定时任务计数器
980        Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>, //待唤醒的工作者唤醒器队列
981        AtomicUsize,                                            //定时器生产计数
982        AtomicUsize,                                            //定时器消费计数
983    )>,
984);
985
986unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Send
987    for MultiTaskRuntime<O, P>
988{
989}
990unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Sync
991    for MultiTaskRuntime<O, P>
992{
993}
994
995impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Clone
996    for MultiTaskRuntime<O, P>
997{
998    fn clone(&self) -> Self {
999        MultiTaskRuntime(self.0.clone())
1000    }
1001}
1002
1003impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>> AsyncRuntime<O>
1004    for MultiTaskRuntime<O, P>
1005{
1006    type Pool = P;
1007
1008    /// 共享运行时内部任务池
1009    fn shared_pool(&self) -> Arc<Self::Pool> {
1010        (self.0).1.clone()
1011    }
1012
1013    /// 获取当前异步运行时的唯一id
1014    fn get_id(&self) -> usize {
1015        (self.0).0
1016    }
1017
1018    /// 获取当前异步运行时待处理任务数量
1019    fn wait_len(&self) -> usize {
1020        (self.0)
1021            .5
1022            .load(Ordering::Relaxed)
1023            .checked_sub((self.0).6.load(Ordering::Relaxed))
1024            .unwrap_or(0)
1025    }
1026
1027    /// 获取当前异步运行时任务数量
1028    fn len(&self) -> usize {
1029        (self.0).1.len()
1030    }
1031
1032    /// 分配异步任务的唯一id
1033    fn alloc<R: 'static>(&self) -> TaskId {
1034        TaskId(UnsafeCell::new((TaskHandle::<R>::default().into_raw() as u128) << 64 | self.get_id() as u128 & 0xffffffffffffffff))
1035    }
1036
1037    /// 派发一个指定的异步任务到异步运行时
1038    fn spawn<F>(&self, future: F) -> Result<TaskId>
1039    where
1040        F: Future<Output = O> + Send + 'static,
1041    {
1042        let task_id = self.alloc::<F::Output>();
1043        if let Err(e) = self.spawn_by_id(task_id.clone(), future) {
1044            return Err(e);
1045        }
1046
1047        Ok(task_id)
1048    }
1049
1050    /// 派发一个异步任务到本地异步运行时,如果本地没有本异步运行时,则会派发到当前运行时中
1051    fn spawn_local<F>(&self, future: F) -> Result<TaskId>
1052        where
1053            F: Future<Output=O> + Send + 'static {
1054        let task_id = self.alloc::<F::Output>();
1055        if let Err(e) = self.spawn_local_by_id(task_id.clone(), future) {
1056            return Err(e);
1057        }
1058
1059        Ok(task_id)
1060    }
1061
1062    /// 派发一个指定优先级的异步任务到异步运行时
1063    fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
1064        where
1065            F: Future<Output=O> + Send + 'static {
1066        let task_id = self.alloc::<F::Output>();
1067        if let Err(e) = self.spawn_priority_by_id(task_id.clone(), priority, future) {
1068            return Err(e);
1069        }
1070
1071        Ok(task_id)
1072    }
1073
1074    /// 派发一个异步任务到异步运行时,并立即让出任务的当前运行
1075    fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
1076        where
1077            F: Future<Output=O> + Send + 'static {
1078        let task_id = self.alloc::<F::Output>();
1079        if let Err(e) = self.spawn_yield_by_id(task_id.clone(), future) {
1080            return Err(e);
1081        }
1082
1083        Ok(task_id)
1084    }
1085
1086    /// 派发一个在指定时间后执行的异步任务到异步运行时,时间单位ms
1087    fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
1088    where
1089        F: Future<Output = O> + Send + 'static,
1090    {
1091        let task_id = self.alloc::<F::Output>();
1092        if let Err(e) = self.spawn_timing_by_id(task_id.clone(), future, time) {
1093            return Err(e);
1094        }
1095
1096        Ok(task_id)
1097    }
1098
1099    /// 派发一个指定任务唯一id的异步任务到异步运行时
1100    fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
1101        where
1102            F: Future<Output=O> + Send + 'static {
1103        let result = {
1104            (self.0).1.push(Arc::new(AsyncTask::new(
1105                task_id,
1106                (self.0).1.clone(),
1107                DEFAULT_MAX_LOW_PRIORITY_BOUNDED,
1108                Some(future.boxed()),
1109            )))
1110        };
1111
1112        if let Some(worker_waker) = (self.0).4.pop() {
1113            //有待唤醒的工作者
1114            let (is_sleep, lock, condvar) = &*worker_waker;
1115            let _locked = lock.lock();
1116            if is_sleep.load(Ordering::Relaxed) {
1117                //待唤醒的工作者,正在休眠,则立即唤醒此工作者
1118                is_sleep.store(false, Ordering::SeqCst); //设置为未休眠
1119                condvar.notify_one();
1120            }
1121        }
1122
1123        result
1124    }
1125
1126    fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
1127        where
1128            F: Future<Output=O> + Send + 'static {
1129        (self.0).1.push_local(Arc::new(AsyncTask::new(
1130            task_id,
1131            (self.0).1.clone(),
1132            DEFAULT_HIGH_PRIORITY_BOUNDED,
1133            Some(future.boxed()),
1134        )))
1135    }
1136
1137    /// 派发一个指定任务唯一id和任务优先级的异步任务到异步运行时
1138    fn spawn_priority_by_id<F>(&self,
1139                               task_id: TaskId,
1140                               priority: usize,
1141                               future: F) -> Result<()>
1142        where
1143            F: Future<Output=O> + Send + 'static {
1144        let result = {
1145            (self.0).1.push_priority(priority, Arc::new(AsyncTask::new(
1146                task_id,
1147                (self.0).1.clone(),
1148                priority,
1149                Some(future.boxed()),
1150            )))
1151        };
1152
1153        if let Some(worker_waker) = (self.0).4.pop() {
1154            //有待唤醒的工作者
1155            let (is_sleep, lock, condvar) = &*worker_waker;
1156            let _locked = lock.lock();
1157            if is_sleep.load(Ordering::Relaxed) {
1158                //待唤醒的工作者,正在休眠,则立即唤醒此工作者
1159                is_sleep.store(false, Ordering::SeqCst); //设置为未休眠
1160                condvar.notify_one();
1161            }
1162        }
1163
1164        result
1165    }
1166
1167    /// 派发一个指定任务唯一id的异步任务到异步运行时,并立即让出任务的当前运行
1168    #[inline]
1169    fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
1170        where
1171            F: Future<Output=O> + Send + 'static {
1172        self.spawn_priority_by_id(task_id,
1173                                  DEFAULT_HIGH_PRIORITY_BOUNDED,
1174                                  future)
1175    }
1176
1177    /// 派发一个指定任务唯一id和在指定时间后执行的异步任务到异步运行时,时间单位ms
1178    fn spawn_timing_by_id<F>(&self,
1179                             task_id: TaskId,
1180                             future: F,
1181                             time: usize) -> Result<()>
1182        where
1183            F: Future<Output=O> + Send + 'static {
1184        let rt = self.clone();
1185        self.spawn_by_id(task_id, async move {
1186            if let Some(timers) = &(rt.0).2 {
1187                //为定时器设置定时异步任务
1188                let id = (rt.0).1.get_thread_id() & 0xffffffff;
1189                let (_, timer) = &timers[id];
1190                timer.set_timer(
1191                    AsyncTimingTask::WaitRun(Arc::new(AsyncTask::new(
1192                        rt.alloc::<F::Output>(),
1193                        (rt.0).1.clone(),
1194                        DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
1195                        Some(future.boxed()),
1196                    ))),
1197                    time,
1198                );
1199
1200                (rt.0).5.fetch_add(1, Ordering::Relaxed);
1201            }
1202
1203            Default::default()
1204        })
1205    }
1206
1207    /// 挂起指定唯一id的异步任务
1208    fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
1209        task_id.set_waker::<Output>(waker);
1210        Poll::Pending
1211    }
1212
1213    /// 唤醒指定唯一id的异步任务
1214    fn wakeup<Output: 'static>(&self, task_id: &TaskId) {
1215        task_id.wakeup::<Output>();
1216    }
1217
1218    /// 挂起当前异步运行时的当前任务,并在指定的其它运行时上派发一个指定的异步任务,等待其它运行时上的异步任务完成后,唤醒当前运行时的当前任务,并返回其它运行时上的异步任务的值
1219    fn wait<V: Send + 'static>(&self) -> AsyncWait<V> {
1220        AsyncWait(self.wait_any(2))
1221    }
1222
1223    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,其中任意一个任务完成,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成的任务的值将被忽略
1224    fn wait_any<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
1225        let (producor, consumer) = async_bounded(capacity);
1226
1227        AsyncWaitAny {
1228            capacity,
1229            producor,
1230            consumer,
1231        }
1232    }
1233
1234    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,任务返回后需要通过用户指定的检查回调进行检查,其中任意一个任务检查通过,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成或未检查通过的任务的值将被忽略,如果所有任务都未检查通过,则强制唤醒当前运行时的当前任务
1235    fn wait_any_callback<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
1236        let (producor, consumer) = async_bounded(capacity);
1237
1238        AsyncWaitAnyCallback {
1239            capacity,
1240            producor,
1241            consumer,
1242        }
1243    }
1244
1245    /// 构建用于派发多个异步任务到指定运行时的映射归并,需要指定映射归并的容量
1246    fn map_reduce<V: Send + 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
1247        let (producor, consumer) = async_bounded(capacity);
1248
1249        AsyncMapReduce {
1250            count: 0,
1251            capacity,
1252            producor,
1253            consumer,
1254        }
1255    }
1256
1257    /// 挂起当前异步运行时的当前任务,等待指定的时间后唤醒当前任务
1258    fn timeout(&self, timeout: usize) -> BoxFuture<'static, ()> {
1259        let rt = self.clone();
1260
1261        if let Some(timers) = &(self.0).2 {
1262            //有本地定时器,则异步等待指定时间
1263            match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
1264                //将休眠的异步任务投递到当前派发线程的定时器内
1265                let thread_id = unsafe { *thread_id.get() };
1266                let index = thread_id & 0xffffffff;
1267                if index > timers.len() {
1268                    //当前线程还未初始化运行时的线程id,说明当前线程不是当前多线程运行时的所属线程
1269                    TimerTaskProducor::Foreign(timers[(self.0).3.load(Ordering::Relaxed) % timers.len()].0.clone())
1270                } else {
1271                    TimerTaskProducor::Local(timers[index].1.clone())
1272                }
1273            }) {
1274                Err(_) => {
1275                    panic!("Multi thread runtime timeout failed, reason: local thread id not match")
1276                }
1277                Ok(producor) => match producor {
1278                    TimerTaskProducor::Local(timer) => {
1279                        LocalAsyncWaitTimeout::new(rt, timer, timeout).boxed()
1280                    },
1281                    TimerTaskProducor::Foreign(producor) => {
1282                        AsyncWaitTimeout::new(rt, producor, timeout).boxed()
1283                    },
1284                },
1285            }
1286        } else {
1287            //没有本地定时器,则同步休眠指定时间
1288            async move {
1289                thread::sleep(Duration::from_millis(timeout as u64));
1290            }
1291            .boxed()
1292        }
1293    }
1294
1295    /// 立即让出当前任务的执行
1296    fn yield_now(&self) -> BoxFuture<'static, ()> {
1297        async move {
1298            YieldNow(false).await;
1299        }.boxed()
1300    }
1301
1302    /// 生成一个异步管道,输入指定流,输入流的每个值通过过滤器生成输出流的值
1303    fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> BoxStream<'static, FO>
1304    where
1305        S: Stream<Item = SO> + Send + 'static,
1306        SO: Send + 'static,
1307        F: FnMut(SO) -> AsyncPipelineResult<FO> + Send + 'static,
1308        FO: Send + 'static,
1309    {
1310        let output = stream! {
1311            for await value in input {
1312                match filter(value) {
1313                    AsyncPipelineResult::Disconnect => {
1314                        //立即中止管道
1315                        break;
1316                    },
1317                    AsyncPipelineResult::Filtered(result) => {
1318                        yield result;
1319                    },
1320                }
1321            }
1322        };
1323
1324        output.boxed()
1325    }
1326
1327    /// 关闭异步运行时,返回请求关闭是否成功
1328    fn close(&self) -> bool {
1329        false
1330    }
1331}
1332
1333impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>> AsyncRuntimeExt<O>
1334    for MultiTaskRuntime<O, P>
1335{
1336    fn spawn_with_context<F, C>(&self, task_id: TaskId, future: F, context: C) -> Result<()>
1337    where
1338        F: Future<Output = O> + Send + 'static,
1339        C: 'static,
1340    {
1341        let task = Arc::new(AsyncTask::with_context(
1342            task_id,
1343            (self.0).1.clone(),
1344            DEFAULT_MAX_LOW_PRIORITY_BOUNDED,
1345            Some(future.boxed()),
1346            context,
1347        ));
1348        let result = (self.0).1.push(task);
1349
1350        if let Some(worker_waker) = (self.0).4.pop() {
1351            //有待唤醒的工作者
1352            let (is_sleep, lock, condvar) = &*worker_waker;
1353            let _locked = lock.lock();
1354            if is_sleep.load(Ordering::Relaxed) {
1355                //待唤醒的工作者,正在休眠,则立即唤醒此工作者
1356                is_sleep.store(false, Ordering::SeqCst); //设置为未休眠
1357                condvar.notify_one();
1358            }
1359        }
1360
1361        result
1362    }
1363
1364    fn spawn_timing_with_context<F, C>(
1365        &self,
1366        task_id: TaskId,
1367        future: F,
1368        context: C,
1369        time: usize,
1370    ) -> Result<()>
1371    where
1372        F: Future<Output = O> + Send + 'static,
1373        C: Send + 'static,
1374    {
1375        let rt = self.clone();
1376        self.spawn_by_id(task_id, async move {
1377            if let Some(timers) = &(rt.0).2 {
1378                //为定时器设置定时异步任务
1379                let id = (rt.0).1.get_thread_id() & 0xffffffff;
1380                let (_, timer) = &timers[id];
1381                timer.set_timer(
1382                    AsyncTimingTask::WaitRun(Arc::new(AsyncTask::with_context(
1383                        rt.alloc::<F::Output>(),
1384                        (rt.0).1.clone(),
1385                        DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
1386                        Some(future.boxed()),
1387                        context,
1388                    ))),
1389                    time,
1390                );
1391
1392                (rt.0).5.fetch_add(1, Ordering::Relaxed);
1393            }
1394
1395            Default::default()
1396        })
1397    }
1398
1399    fn block_on<F>(&self, future: F) -> Result<F::Output>
1400    where
1401        F: Future + Send + 'static,
1402        <F as Future>::Output: Default + Send + 'static,
1403    {
1404        //从本地线程获取当前异步运行时
1405        if let Some(local_rt) = local_async_runtime::<F::Output>() {
1406            //本地线程绑定了异步运行时
1407            if local_rt.get_id() == self.get_id() {
1408                //如果是相同运行时,则立即返回错误
1409                return Err(Error::new(
1410                    ErrorKind::WouldBlock,
1411                    format!("Block on failed, reason: would block"),
1412                ));
1413            }
1414        }
1415
1416        let (sender, receiver) = bounded(1);
1417        if let Err(e) = self.spawn(async move {
1418            //在指定运行时中执行,并返回结果
1419            let r = future.await;
1420            sender.send(r);
1421
1422            Default::default()
1423        }) {
1424            return Err(Error::new(
1425                ErrorKind::Other,
1426                format!("Block on failed, reason: {:?}", e),
1427            ));
1428        }
1429
1430        //同步阻塞等待异步任务返回
1431        match receiver.recv() {
1432            Err(e) => Err(Error::new(
1433                ErrorKind::Other,
1434                format!("Block on failed, reason: {:?}", e),
1435            )),
1436            Ok(result) => Ok(result),
1437        }
1438    }
1439}
1440
1441impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>
1442    MultiTaskRuntime<O, P>
1443{
1444    /// 获取当前运行时可新增的工作者数量
1445    pub fn idler_len(&self) -> usize {
1446        (self.0).1.idler_len()
1447    }
1448
1449    /// 获取当前运行时的工作者数量
1450    pub fn worker_len(&self) -> usize {
1451        (self.0).1.worker_len()
1452    }
1453
1454    /// 获取当前运行时缓冲区的任务数量,缓冲区的任务暂时没有分配给工作者
1455    pub fn buffer_len(&self) -> usize {
1456        (self.0).1.buffer_len()
1457    }
1458
1459    /// 获取当前多线程异步运行时的本地异步运行时
1460    pub fn to_local_runtime(&self) -> LocalAsyncRuntime<O> {
1461        LocalAsyncRuntime {
1462            inner: self.as_raw(),
1463            get_id_func: MultiTaskRuntime::<O, P>::get_id_raw,
1464            spawn_func: MultiTaskRuntime::<O, P>::spawn_raw,
1465            spawn_timing_func: MultiTaskRuntime::<O, P>::spawn_timing_raw,
1466            timeout_func: MultiTaskRuntime::<O, P>::timeout_raw,
1467        }
1468    }
1469
1470    /// 获取当前多线程异步运行时的指针
1471    #[inline]
1472    pub(crate) fn as_raw(&self) -> *const () {
1473        Arc::into_raw(self.0.clone()) as *const ()
1474    }
1475
1476    // 获取指定指针的单线程异步运行时
1477    #[inline]
1478    pub(crate) fn from_raw(raw: *const ()) -> Self {
1479        let inner = unsafe {
1480            Arc::from_raw(
1481                raw as *const (
1482                    usize,
1483                    Arc<P>,
1484                    Option<
1485                        Vec<(
1486                            Sender<(usize, AsyncTimingTask<P, O>)>,
1487                            Arc<AsyncTaskTimerByNotCancel<P, O>>,
1488                        )>,
1489                    >,
1490                    AtomicUsize,
1491                    Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>,
1492                    AtomicUsize,
1493                    AtomicUsize,
1494                ),
1495            )
1496        };
1497        MultiTaskRuntime(inner)
1498    }
1499
1500    // 获取当前异步运行时的唯一id
1501    pub(crate) fn get_id_raw(raw: *const ()) -> usize {
1502        let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
1503        let id = rt.get_id();
1504        Arc::into_raw(rt.0); //避免提前释放
1505        id
1506    }
1507
1508    // 派发一个指定的异步任务到异步运行时
1509    pub(crate) fn spawn_raw<F>(raw: *const (), future: F) -> Result<()>
1510    where
1511        F: Future<Output = O> + Send + 'static,
1512    {
1513        let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
1514        let result = rt.spawn_by_id(rt.alloc::<F::Output>(), future);
1515        Arc::into_raw(rt.0); //避免提前释放
1516        result
1517    }
1518
1519    // 定时派发一个指定的异步任务到异步运行时
1520    pub(crate) fn spawn_timing_raw(
1521        raw: *const (),
1522        future: BoxFuture<'static, O>,
1523        timeout: usize,
1524    ) -> Result<()> {
1525        let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
1526        let result = rt.spawn_timing_by_id(rt.alloc::<O>(), future, timeout);
1527        Arc::into_raw(rt.0); //避免提前释放
1528        result
1529    }
1530
1531    // 挂起当前异步运行时的当前任务,等待指定的时间后唤醒当前任务
1532    pub(crate) fn timeout_raw(raw: *const (), timeout: usize) -> BoxFuture<'static, ()> {
1533        let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
1534        let boxed = rt.timeout(timeout);
1535        Arc::into_raw(rt.0); //避免提前释放
1536        boxed
1537    }
1538}
1539
1540///
1541/// 异步多线程任务运行时构建器
1542///
1543pub struct MultiTaskRuntimeBuilder<
1544    O: Default + 'static = (),
1545    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = StealableTaskPool<O>,
1546> {
1547    pool: P,                 //异步多线程任务运行时
1548    prefix: String,          //工作者线程名称前缀
1549    init: usize,             //初始工作者数量
1550    min: usize,              //最少工作者数量
1551    max: usize,              //最大工作者数量
1552    stack_size: usize,       //工作者线程栈大小
1553    timeout: u64,            //工作者空闲时最长休眠时间
1554    interval: Option<usize>, //工作者定时器间隔
1555    marker: PhantomData<O>,
1556}
1557
1558unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Send
1559    for MultiTaskRuntimeBuilder<O, P>
1560{
1561}
1562unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Sync
1563    for MultiTaskRuntimeBuilder<O, P>
1564{
1565}
1566
1567impl<O: Default + 'static> Default for MultiTaskRuntimeBuilder<O> {
1568    //默认构建可窃取可伸缩的多线程运行时
1569    fn default() -> Self {
1570        #[cfg(not(target_arch = "wasm32"))]
1571        let core_len = num_cpus::get(); //默认的工作者的数量为本机逻辑核数
1572        #[cfg(target_arch = "wasm32")]
1573        let core_len = 1; //默认的工作者的数量为1
1574        let pool = StealableTaskPool::with(core_len,
1575                                           65535,
1576                                           [1, 1],
1577                                           3000);
1578        MultiTaskRuntimeBuilder::new(pool)
1579            .thread_stack_size(2 * 1024 * 1024)
1580            .set_timer_interval(1)
1581    }
1582}
1583
1584impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>
1585    MultiTaskRuntimeBuilder<O, P>
1586{
1587    /// 构建指定任务池、线程名前缀、初始线程数量、最少线程数量、最大线程数量、线程栈大小、线程空闲时最长休眠时间和是否使用本地定时器的多线程任务池
1588    pub fn new(mut pool: P) -> Self {
1589        #[cfg(not(target_arch = "wasm32"))]
1590        let core_len = num_cpus::get(); //获取本机cpu逻辑核数
1591        #[cfg(target_arch = "wasm32")]
1592        let core_len = 1; //默认为1
1593
1594        MultiTaskRuntimeBuilder {
1595            pool,
1596            prefix: DEFAULT_WORKER_THREAD_PREFIX.to_string(),
1597            init: core_len,
1598            min: core_len,
1599            max: core_len,
1600            stack_size: DEFAULT_THREAD_STACK_SIZE,
1601            timeout: DEFAULT_WORKER_THREAD_SLEEP_TIME,
1602            interval: None,
1603            marker: PhantomData,
1604        }
1605    }
1606
1607    /// 设置工作者线程名称前缀
1608    pub fn thread_prefix(mut self, prefix: &str) -> Self {
1609        self.prefix = prefix.to_string();
1610        self
1611    }
1612
1613    /// 设置工作者线程栈大小
1614    pub fn thread_stack_size(mut self, stack_size: usize) -> Self {
1615        self.stack_size = stack_size;
1616        self
1617    }
1618
1619    /// 设置初始工作者数量
1620    pub fn init_worker_size(mut self, mut init: usize) -> Self {
1621        if init == 0 {
1622            //初始线程数量过小,则设置默认的初始线程数量
1623            init = DEFAULT_INIT_WORKER_SIZE;
1624        }
1625
1626        self.init = init;
1627        self
1628    }
1629
1630    /// 设置最小工作者数量和最大工作者数量
1631    pub fn set_worker_limit(mut self, mut min: usize, mut max: usize) -> Self {
1632        if self.init > max {
1633            //初始线程数量大于最大线程数量,则设置最大线程数量为初始线程数量
1634            max = self.init;
1635        }
1636
1637        if min == 0 || min > max {
1638            //最少线程数量无效,则设置最少线程数量为最大线程数量
1639            min = max;
1640        }
1641
1642        self.min = min;
1643        self.max = max;
1644        self
1645    }
1646
1647    /// 设置工作者空闲时最大休眠时长
1648    pub fn set_timeout(mut self, timeout: u64) -> Self {
1649        self.timeout = timeout;
1650        self
1651    }
1652
1653    /// 设置工作者定时器间隔
1654    pub fn set_timer_interval(mut self, interval: usize) -> Self {
1655        self.interval = Some(interval);
1656        self
1657    }
1658
1659    /// 构建并启动多线程异步运行时
1660    pub fn build(mut self) -> MultiTaskRuntime<O, P> {
1661        //构建多线程任务运行时的本地定时器和定时异步任务生产者
1662        let interval = self.interval;
1663        let mut timers = if let Some(_) = interval {
1664            Some(Vec::with_capacity(self.max))
1665        } else {
1666            None
1667        };
1668        for _ in 0..self.max {
1669            //初始化指定的最大线程数量的本地定时器和定时异步任务生产者,定时器不会在关闭工作者时被移除
1670            if let Some(vec) = &mut timers {
1671                let timer = AsyncTaskTimerByNotCancel::new();
1672                let producor = timer.producor.clone();
1673                let timer = Arc::new(timer);
1674                vec.push((producor, timer));
1675            };
1676        }
1677
1678        //构建多线程任务运行时
1679        let rt_uid = alloc_rt_uid();
1680        let waits = Arc::new(ArrayQueue::new(self.max));
1681        let mut pool = self.pool;
1682        pool.set_waits(waits.clone()); //设置待唤醒的工作者唤醒器队列
1683        let pool = Arc::new(pool);
1684        let runtime = MultiTaskRuntime(Arc::new((
1685            rt_uid,
1686            pool,
1687            timers,
1688            AtomicUsize::new(0),
1689            waits,
1690            AtomicUsize::new(0),
1691            AtomicUsize::new(0),
1692        )));
1693
1694        //构建初始化线程数量的线程构建器
1695        let mut builders = Vec::with_capacity(self.init);
1696        for index in 0..self.init {
1697            let builder = Builder::new()
1698                .name(self.prefix.clone() + "-" + index.to_string().as_str())
1699                .stack_size(self.stack_size);
1700            builders.push(builder);
1701        }
1702
1703        //启动工作者线程
1704        let min = self.min;
1705        for index in 0..builders.len() {
1706            let builder = builders.remove(0);
1707            let runtime = runtime.clone();
1708            let timeout = self.timeout;
1709            let timer = if let Some(timers) = &(runtime.0).2 {
1710                let (_, timer) = &timers[index];
1711                Some(timer.clone())
1712            } else {
1713                None
1714            };
1715
1716            spawn_worker_thread(builder, index, runtime, min, timeout, interval, timer);
1717        }
1718
1719        runtime
1720    }
1721}
1722
1723//分派工作者线程,并开始工作
1724fn spawn_worker_thread<
1725    O: Default + 'static,
1726    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
1727>(
1728    builder: Builder,
1729    index: usize,
1730    runtime: MultiTaskRuntime<O, P>,
1731    min: usize,
1732    timeout: u64,
1733    interval: Option<usize>,
1734    timer: Option<Arc<AsyncTaskTimerByNotCancel<P, O>>>,
1735) {
1736    if let Some(timer) = timer {
1737        //设置了定时器
1738        let rt_uid = runtime.get_id();
1739        let _ = builder.spawn(move || {
1740            //设置线程本地唯一id
1741            if let Err(e) = PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| unsafe {
1742                *thread_id.get() = rt_uid << 32 | index & 0xffffffff;
1743            }) {
1744                panic!(
1745                    "Multi thread runtime startup failed, thread id: {:?}, reason: {:?}",
1746                    index, e
1747                );
1748            }
1749
1750            //绑定运行时到线程
1751            let runtime_copy = runtime.clone();
1752            match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
1753                let raw = Arc::into_raw(Arc::new(runtime_copy.to_local_runtime()))
1754                    as *mut LocalAsyncRuntime<O> as *mut ();
1755                rt.store(raw, Ordering::Relaxed);
1756            }) {
1757                Err(e) => {
1758                    panic!("Bind multi runtime to local thread failed, reason: {:?}", e);
1759                }
1760                Ok(_) => (),
1761            }
1762
1763            //执行有定时器的工作循环
1764            timer_work_loop(
1765                runtime,
1766                index,
1767                min,
1768                timeout,
1769                interval.unwrap() as u64,
1770                timer,
1771            );
1772        });
1773    } else {
1774        //未设置定时器
1775        let rt_uid = runtime.get_id();
1776        let _ = builder.spawn(move || {
1777            //设置线程本地唯一id
1778            if let Err(e) = PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| unsafe {
1779                *thread_id.get() = rt_uid << 32 | index & 0xffffffff;
1780            }) {
1781                panic!(
1782                    "Multi thread runtime startup failed, thread id: {:?}, reason: {:?}",
1783                    index, e
1784                );
1785            }
1786
1787            //绑定运行时到线程
1788            let runtime_copy = runtime.clone();
1789            match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
1790                let raw = Arc::into_raw(Arc::new(runtime_copy.to_local_runtime()))
1791                    as *mut LocalAsyncRuntime<O> as *mut ();
1792                rt.store(raw, Ordering::Relaxed);
1793            }) {
1794                Err(e) => {
1795                    panic!("Bind multi runtime to local thread failed, reason: {:?}", e);
1796                }
1797                Ok(_) => (),
1798            }
1799
1800            //执行无定时器的工作循环
1801            work_loop(runtime, index, min, timeout);
1802        });
1803    }
1804}
1805
1806//线程工作循环
1807fn timer_work_loop<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(
1808    runtime: MultiTaskRuntime<O, P>,
1809    index: usize,
1810    min: usize,
1811    sleep_timeout: u64,
1812    timer_interval: u64,
1813    timer: Arc<AsyncTaskTimerByNotCancel<P, O>>,
1814) {
1815    //初始化当前线程的线程id和线程活动状态
1816    let pool = (runtime.0).1.clone();
1817    let worker_waker = pool.clone_thread_waker().unwrap();
1818
1819    let mut sleep_count = 0; //连续休眠计数器
1820    let clock = Clock::new();
1821    loop {
1822        //设置新的定时异步任务,并唤醒已到期的定时异步任务
1823        let timer_run_millis = clock.recent(); //重置定时器运行时长
1824        let mut pop_len = 0;
1825        (runtime.0)
1826            .5
1827            .fetch_add(timer.consume(),
1828                       Ordering::Relaxed);
1829        loop {
1830            let current_time = timer.is_require_pop();
1831            if let Some(current_time) = current_time {
1832                //当前有到期的定时异步任务,则开始处理到期的所有定时异步任务
1833                loop {
1834                    let timed_out = timer.pop(current_time);
1835                    if let Some(timing_task) = timed_out {
1836                        match timing_task {
1837                            AsyncTimingTask::Pended(expired) => {
1838                                //唤醒休眠的异步任务,不需要立即在本工作者中执行,因为休眠的异步任务无法取消
1839                                runtime.wakeup::<O>(&expired);
1840                            }
1841                            AsyncTimingTask::WaitRun(expired) => {
1842                                //执行到期的定时异步任务,需要立即在本工作者中执行,因为定时异步任务可以取消
1843                                (runtime.0)
1844                                    .1
1845                                    .push_priority(DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
1846                                                   expired);
1847                                if let Some(task) = pool.try_pop() {
1848                                    sleep_count = 0; //重置连续休眠次数
1849                                    run_task(&runtime, task);
1850                                }
1851                            }
1852                        }
1853                        pop_len += 1;
1854
1855                        if let Some(task) = pool.try_pop() {
1856                            //执行当前工作者任务池中的异步任务,避免定时异步任务占用当前工作者的所有工作时间
1857                            sleep_count = 0; //重置连续休眠次数
1858                            run_task(&runtime, task);
1859                        }
1860                    } else {
1861                        //当前所有的到期任务已处理完,则退出本次定时异步任务处理
1862                        break;
1863                    }
1864                }
1865            } else {
1866                //当前没有到期的定时异步任务,则退出本次定时异步任务处理
1867                break;
1868            }
1869        }
1870        (runtime.0)
1871            .6
1872            .fetch_add(pop_len,
1873                       Ordering::Relaxed);
1874
1875        //继续执行当前工作者任务池中的异步任务
1876        match pool.try_pop() {
1877            None => {
1878                if runtime.len() > 0 {
1879                    //确认当前还有任务需要处理,可能还没分配到当前工作者,则当前工作者继续工作
1880                    continue;
1881                }
1882
1883                //无任务,则准备休眠
1884                {
1885                    let (is_sleep, lock, condvar) = &*worker_waker;
1886                    let mut locked = lock.lock();
1887
1888                    //设置当前为休眠状态
1889                    is_sleep.store(true, Ordering::SeqCst);
1890
1891                    //获取休眠的实际时长
1892                    let diff_time = clock
1893                        .recent()
1894                        .duration_since(timer_run_millis)
1895                        .as_millis() as u64; //获取定时器运行时长
1896                    let real_timeout = if timer.len() == 0 {
1897                        //当前定时器没有未到期的任务,则休眠指定时长
1898                        sleep_timeout
1899                    } else {
1900                        //当前定时器还有未到期的任务,则计算需要休眠的时长
1901                        if diff_time >= timer_interval {
1902                            //定时器内部时间与当前时间差距过大,则忽略休眠,并继续工作
1903                            continue;
1904                        } else {
1905                            //定时器内部时间与当前时间差距不大,则休眠差值时间
1906                            timer_interval - diff_time
1907                        }
1908                    };
1909
1910                    //记录待唤醒的工作者唤醒器,用于有新任务时唤醒对应的工作者
1911                    (runtime.0).4.push(worker_waker.clone());
1912
1913                    //让当前工作者休眠,等待有任务时被唤醒或超时后自动唤醒
1914                    if condvar
1915                        .wait_for(&mut locked, Duration::from_millis(real_timeout))
1916                        .timed_out()
1917                    {
1918                        //条件超时唤醒,则设置状态为未休眠
1919                        is_sleep.store(false, Ordering::SeqCst);
1920                        //记录连续休眠次数,因为任务导致的唤醒不会计数
1921                        sleep_count += 1;
1922                    }
1923                }
1924            }
1925            Some(task) => {
1926                //有任务,则执行
1927                sleep_count = 0; //重置连续休眠次数
1928                run_task(&runtime, task);
1929            }
1930        }
1931    }
1932
1933    //关闭当前工作者的任务池
1934    (runtime.0).1.close_worker();
1935    warn!(
1936        "Worker of runtime closed, runtime: {}, worker: {}, thread: {:?}",
1937        runtime.get_id(),
1938        index,
1939        thread::current()
1940    );
1941}
1942
1943//线程工作循环
1944fn work_loop<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(
1945    runtime: MultiTaskRuntime<O, P>,
1946    index: usize,
1947    min: usize,
1948    sleep_timeout: u64,
1949) {
1950    //初始化当前线程的线程id和线程活动状态
1951    let pool = (runtime.0).1.clone();
1952    let worker_waker = pool.clone_thread_waker().unwrap();
1953
1954    let mut sleep_count = 0; //连续休眠计数器
1955    loop {
1956        match pool.try_pop() {
1957            None => {
1958                //无任务,则准备休眠
1959                if runtime.len() > 0 {
1960                    //确认当前还有任务需要处理,可能还没分配到当前工作者,则当前工作者继续工作
1961                    continue;
1962                }
1963
1964                {
1965                    let (is_sleep, lock, condvar) = &*worker_waker;
1966                    let mut locked = lock.lock();
1967
1968                    //设置当前为休眠状态
1969                    is_sleep.store(true, Ordering::SeqCst);
1970
1971                    //记录待唤醒的工作者唤醒器,用于有新任务时唤醒对应的工作者
1972                    (runtime.0).4.push(worker_waker.clone());
1973
1974                    //让当前工作者休眠,等待有任务时被唤醒或超时后自动唤醒
1975                    if condvar
1976                        .wait_for(&mut locked, Duration::from_millis(sleep_timeout))
1977                        .timed_out()
1978                    {
1979                        //条件超时唤醒,则设置状态为未休眠
1980                        is_sleep.store(false, Ordering::SeqCst);
1981                        //记录连续休眠次数,因为任务导致的唤醒不会计数
1982                        sleep_count += 1;
1983                    }
1984                }
1985            }
1986            Some(task) => {
1987                //有任务,则执行
1988                sleep_count = 0; //重置连续休眠次数
1989                run_task(&runtime, task);
1990            }
1991        }
1992    }
1993
1994    //关闭当前工作者的任务池
1995    (runtime.0).1.close_worker();
1996    warn!(
1997        "Worker of runtime closed, runtime: {}, worker: {}, thread: {:?}",
1998        runtime.get_id(),
1999        index,
2000        thread::current()
2001    );
2002}
2003
2004//执行异步任务
2005#[inline]
2006fn run_task<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(
2007    runtime: &MultiTaskRuntime<O, P>,
2008    task: Arc<AsyncTask<P, O>>,
2009) {
2010    let waker = waker_ref(&task);
2011    let mut context = Context::from_waker(&*waker);
2012    if let Some(mut future) = task.get_inner() {
2013        if let Poll::Pending = future.as_mut().poll(&mut context) {
2014            //当前未准备好,则恢复异步任务,以保证异步服务后续访问异步任务和异步任务不被提前释放
2015            task.set_inner(Some(future));
2016        }
2017    } else {
2018        //当前异步任务在唤醒时还未被重置内部任务,则继续加入当前异步运行时队列,并等待下次被执行
2019        (runtime.0).1.push(task);
2020    }
2021}
2022
2023// 定时器任务生产者
2024enum TimerTaskProducor<
2025    O: Default + 'static = (),
2026    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = StealableTaskPool<O>,
2027> {
2028    Local(Arc<AsyncTaskTimerByNotCancel<P, O>>),        //本地定时器任务生产者
2029    Foreign(Sender<(usize, AsyncTimingTask<P, O>)>),    //外部定时器任务生产者
2030}