pi_async_rt/rt/
single_thread.rs

1//! # 单线程运行时
2//!
3//! - [SingleTaskPool]\: 单线程任务池
4//! - [SingleTaskRunner]\: 单线程异步任务执行器
5//! - [SingleTaskRuntime]\: 异步单线程任务运行时
6//!
7//! [SingleTaskPool]: struct.SingleTaskPool.html
8//! [SingleTaskRunner]: struct.SingleTaskRunner.html
9//! [SingleTaskRuntime]: struct.SingleTaskRuntime.html
10//!
11//! # Examples
12//!
13//! ```
14//! use pi_async::prelude::{SingleTaskPool, SingleTaskRunner};
15//! let pool = SingleTaskPool::default();
16//! let rt = SingleTaskRunner::<(), SingleTaskPool<()>>::new(pool).into_local();
17//! let _ = rt.block_on(async move {});
18//! ```
19
20use std::thread;
21use std::sync::Arc;
22use std::vec::IntoIter;
23use std::future::Future;
24use std::cell::UnsafeCell;
25use std::task::{Context, Poll, Waker};
26use std::io::{Error, ErrorKind, Result};
27use std::collections::vec_deque::VecDeque;
28use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
29
30use async_stream::stream;
31use crossbeam_channel::Sender;
32use crossbeam_queue::SegQueue;
33use flume::bounded as async_bounded;
34use futures::{
35    future::{BoxFuture, FutureExt},
36    stream::{BoxStream, Stream, StreamExt},
37    task::waker_ref,
38};
39use parking_lot::{Condvar, Mutex};
40use quanta::Clock;
41
42use wrr::IWRRSelector;
43
44use super::{
45    PI_ASYNC_THREAD_LOCAL_ID, DEFAULT_MAX_HIGH_PRIORITY_BOUNDED, DEFAULT_HIGH_PRIORITY_BOUNDED, DEFAULT_MAX_LOW_PRIORITY_BOUNDED, alloc_rt_uid, AsyncMapReduce, AsyncPipelineResult, AsyncRuntime,
46    AsyncRuntimeExt, AsyncTask, AsyncTaskPool, AsyncTaskPoolExt, AsyncTaskTimer, AsyncWait,
47    AsyncWaitAny, AsyncWaitAnyCallback, AsyncWaitTimeout, LocalAsyncRuntime, TaskId, YieldNow
48};
49use crate::rt::{TaskHandle, AsyncTimingTask};
50
51///
52/// 单线程任务池
53///
54pub struct SingleTaskPool<O: Default + 'static> {
55    id:             usize,                                                      //绑定的运行时唯一id
56    public:         SegQueue<Arc<AsyncTask<SingleTaskPool<O>, O>>>,             //外部任务队列
57    internal:       UnsafeCell<VecDeque<Arc<AsyncTask<SingleTaskPool<O>, O>>>>, //内部任务队列
58    stack:          UnsafeCell<Vec<Arc<AsyncTask<SingleTaskPool<O>, O>>>>,      //本地任务栈
59    selector:       UnsafeCell<IWRRSelector<2>>,                                //任务池选择器
60    consume_count:  AtomicUsize,                                                //任务消费计数
61    produce_count:  AtomicUsize,                                                //任务生产计数
62    thread_waker:   Option<Arc<(AtomicBool, Mutex<()>, Condvar)>>,              //绑定线程的唤醒器
63}
64
65unsafe impl<O: Default + 'static> Send for SingleTaskPool<O> {}
66unsafe impl<O: Default + 'static> Sync for SingleTaskPool<O> {}
67
68impl<O: Default + 'static> Default for SingleTaskPool<O> {
69    fn default() -> Self {
70        SingleTaskPool::new([1, 1])
71    }
72}
73
74impl<O: Default + 'static> AsyncTaskPool<O> for SingleTaskPool<O> {
75    type Pool = SingleTaskPool<O>;
76
77    #[inline]
78    fn get_thread_id(&self) -> usize {
79        let rt_uid = self.id;
80        match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
81            let current = unsafe { *thread_id.get() };
82            if current == usize::MAX {
83                //当前线程还未初始化运行时的线程id,则初始化
84                unsafe {
85                    *thread_id.get() = rt_uid << 32;
86                    *thread_id.get()
87                }
88            } else {
89                current
90            }
91        }) {
92            Err(e) => {
93                //不应该执行到这个分支
94                panic!(
95                    "Get thread id failed, thread: {:?}, reason: {:?}",
96                    thread::current(),
97                    e
98                );
99            }
100            Ok(id) => id,
101        }
102    }
103
104    #[inline]
105    fn len(&self) -> usize {
106        if let Some(len) = self
107            .produce_count
108            .load(Ordering::Relaxed)
109            .checked_sub(self.consume_count.load(Ordering::Relaxed))
110        {
111            len
112        } else {
113            0
114        }
115    }
116
117    #[inline]
118    fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
119        self.public.push(task);
120        self.produce_count.fetch_add(1, Ordering::Relaxed);
121        Ok(())
122    }
123
124    #[inline]
125    fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
126        let id = self.get_thread_id();
127        let rt_uid = task.owner();
128        if (id >> 32) == rt_uid {
129            //当前是运行时所在线程
130            unsafe {{
131                (&mut *self.internal.get()).push_back(task);
132            }}
133            self.produce_count.fetch_add(1, Ordering::Relaxed);
134            Ok(())
135        } else {
136            //当前不是运行时所在线程
137            self.push(task)
138        }
139    }
140
141    #[inline]
142    fn push_priority(&self,
143                     priority: usize,
144                     task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
145        if priority >= DEFAULT_MAX_HIGH_PRIORITY_BOUNDED {
146            //最高优先级
147            let id = self.get_thread_id();
148            let rt_uid = task.owner();
149            if (id >> 32) == rt_uid {
150                //当前是运行时所在线程
151                unsafe {
152                    let stack = (&mut *self.stack.get());
153                    if stack
154                        .capacity()
155                        .checked_sub(stack.len())
156                        .unwrap_or(0) >= 0 {
157                        //本地任务栈有空闲容量,则立即将任务加入本地任务栈
158                        (&mut *self.stack.get()).push(task);
159                    } else {
160                        //本地内部任务队列有空闲容量,则立即将任务加入本地内部任务队列
161                        (&mut *self.internal.get()).push_back(task);
162                    }
163                }
164
165                self.produce_count.fetch_add(1, Ordering::Relaxed);
166                Ok(())
167            } else {
168                //当前不是运行时所在线程
169                self.push(task)
170            }
171        } else if priority >= DEFAULT_HIGH_PRIORITY_BOUNDED {
172            //高优先级
173            self.push_local(task)
174        } else {
175            //低优先级
176            self.push(task)
177        }
178    }
179
180    #[inline]
181    fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
182        self.push_priority(DEFAULT_HIGH_PRIORITY_BOUNDED, task)
183    }
184
185    #[inline]
186    fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
187        let task = unsafe { (&mut *self
188            .stack
189            .get())
190            .pop()
191        };
192        if task.is_some() {
193            //指定工作者的任务栈有任务,则立即返回任务
194            self.consume_count.fetch_add(1, Ordering::Relaxed);
195            return task;
196        }
197
198        //从指定工作者的任务队列中弹出任务
199        let task = try_pop_by_weight(self);
200        if task.is_some() {
201            self
202                .consume_count
203                .fetch_add(1, Ordering::Relaxed);
204        }
205        task
206    }
207
208    #[inline]
209    fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
210        let mut all = Vec::with_capacity(self.len());
211
212        let internal = unsafe { (&mut *self.internal.get()) };
213        for _ in 0..internal.len() {
214            if let Some(task) = internal.pop_front() {
215                all.push(task);
216            }
217        }
218
219        let public_len = self.public.len();
220        for _ in 0..public_len {
221            if let Some(task) = self.public.pop() {
222                all.push(task);
223            }
224        }
225
226        all.into_iter()
227    }
228
229    #[inline]
230    fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
231        self.thread_waker.as_ref()
232    }
233}
234
235// 尝试通过统计信息更新权重,根据权重选择从本地外部任务队列或本地内部任务队列中弹出任务
236fn try_pop_by_weight<O: Default + 'static>(pool: &SingleTaskPool<O>)
237    -> Option<Arc<AsyncTask<SingleTaskPool<O>, O>>> {
238    unsafe {
239        //根据权重选择从指定的任务队列弹出任务
240        match (&mut *pool.selector.get()).select() {
241            0 => {
242                //弹出外部任务
243                let task = try_pop_external(pool);
244                if task.is_some() {
245                    task
246                } else {
247                    //当前没有外部任务,则尝试弹出内部任务
248                    try_pop_internal(pool)
249                }
250            },
251            _ => {
252                //弹出内部任务
253                let task = try_pop_internal(pool);
254                if task.is_some() {
255                    task
256                } else {
257                    //当前没有内部任务,则尝试弹出外部任务
258                    try_pop_external(pool)
259                }
260            },
261        }
262    }
263}
264
265// 尝试弹出内部任务队列的任务
266#[inline]
267fn try_pop_internal<O: Default + 'static>(pool: &SingleTaskPool<O>)
268    -> Option<Arc<AsyncTask<SingleTaskPool<O>, O>>> {
269    unsafe { (&mut *pool.internal.get()).pop_front() }
270}
271
272// 尝试弹出外部任务队列的任务
273#[inline]
274fn try_pop_external<O: Default + 'static>(pool: &SingleTaskPool<O>)
275                                          -> Option<Arc<AsyncTask<SingleTaskPool<O>, O>>> {
276    pool.public.pop()
277}
278
279impl<O: Default + 'static> AsyncTaskPoolExt<O> for SingleTaskPool<O> {
280    fn set_thread_waker(&mut self, thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) {
281        self.thread_waker = Some(thread_waker);
282    }
283}
284
285impl<O: Default + 'static> SingleTaskPool<O> {
286    /// 构建指定权重的单线程任务池
287    pub fn new(weights: [u8; 2]) -> Self {
288        let id = alloc_rt_uid();
289        let public = SegQueue::new();
290        let internal = UnsafeCell::new(VecDeque::new());
291        let stack = UnsafeCell::new(Vec::with_capacity(1));
292        let selector = UnsafeCell::new(IWRRSelector::new(weights));
293        let consume_count = AtomicUsize::new(0);
294        let produce_count = AtomicUsize::new(0);
295
296        SingleTaskPool {
297            id,
298            public,
299            internal,
300            stack,
301            selector,
302            consume_count,
303            produce_count,
304            thread_waker: Some(Arc::new((
305                AtomicBool::new(false),
306                Mutex::new(()),
307                Condvar::new(),
308            ))),
309        }
310    }
311}
312
313///
314/// 异步单线程任务运行时
315///
316pub struct SingleTaskRuntime<
317    O: Default + 'static = (),
318    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>,
319>(
320    Arc<(
321        usize,                                  //运行时唯一id
322        Arc<P>,                                 //异步任务池
323        Sender<(usize, AsyncTimingTask<P, O>)>, //休眠的异步任务生产者
324        AsyncTaskTimer<P, O>,                   //本地定时器
325        AtomicUsize,                            //定时器任务生产计数
326        AtomicUsize,                            //定时器任务消费计数
327    )>,
328);
329
330unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Send
331    for SingleTaskRuntime<O, P>
332{
333}
334unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Sync
335    for SingleTaskRuntime<O, P>
336{
337}
338
339impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Clone
340    for SingleTaskRuntime<O, P>
341{
342    fn clone(&self) -> Self {
343        SingleTaskRuntime(self.0.clone())
344    }
345}
346
347impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>> AsyncRuntime<O>
348    for SingleTaskRuntime<O, P>
349{
350    type Pool = P;
351
352    /// 共享运行时内部任务池
353    fn shared_pool(&self) -> Arc<Self::Pool> {
354        (self.0).1.clone()
355    }
356
357    /// 获取当前异步运行时的唯一id
358    fn get_id(&self) -> usize {
359        (self.0).0
360    }
361
362    /// 获取当前异步运行时待处理任务数量
363    fn wait_len(&self) -> usize {
364        (self.0)
365            .4
366            .load(Ordering::Relaxed)
367            .checked_sub((self.0).5.load(Ordering::Relaxed))
368            .unwrap_or(0)
369    }
370
371    /// 获取当前异步运行时任务数量
372    fn len(&self) -> usize {
373        (self.0).1.len()
374    }
375
376    /// 分配异步任务的唯一id
377    fn alloc<R: 'static>(&self) -> TaskId {
378        TaskId(UnsafeCell::new((TaskHandle::<R>::default().into_raw() as u128) << 64 | self.get_id() as u128 & 0xffffffffffffffff))
379    }
380
381    /// 派发一个指定的异步任务到异步运行时
382    fn spawn<F>(&self, future: F) -> Result<TaskId>
383    where
384        F: Future<Output = O> + Send + 'static,
385    {
386        let task_id = self.alloc::<F::Output>();
387        if let Err(e) = self.spawn_by_id(task_id.clone(), future) {
388            return Err(e);
389        }
390
391        Ok(task_id)
392    }
393
394    /// 派发一个异步任务到本地异步运行时,如果本地没有本异步运行时,则会派发到当前运行时中
395    fn spawn_local<F>(&self, future: F) -> Result<TaskId>
396        where
397            F: Future<Output = O> + Send + 'static {
398        let task_id = self.alloc::<F::Output>();
399        if let Err(e) = self.spawn_local_by_id(task_id.clone(), future) {
400            return Err(e);
401        }
402
403        Ok(task_id)
404    }
405
406    /// 派发一个指定优先级的异步任务到异步运行时
407    fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
408        where
409            F: Future<Output = O> + Send + 'static {
410        let task_id = self.alloc::<F::Output>();
411        if let Err(e) = self.spawn_priority_by_id(task_id.clone(), priority, future) {
412            return Err(e);
413        }
414
415        Ok(task_id)
416    }
417
418    /// 派发一个异步任务到异步运行时,并立即让出任务的当前运行
419    fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
420        where
421            F: Future<Output = O> + Send + 'static {
422        let task_id = self.alloc::<F::Output>();
423        if let Err(e) = self.spawn_yield_by_id(task_id.clone(), future) {
424            return Err(e);
425        }
426
427        Ok(task_id)
428    }
429
430    /// 派发一个在指定时间后执行的异步任务到异步运行时,时间单位ms
431    fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
432    where
433        F: Future<Output = O> + Send + 'static,
434    {
435        let task_id = self.alloc::<F::Output>();
436        if let Err(e) = self.spawn_timing_by_id(task_id.clone(), future, time) {
437            return Err(e);
438        }
439
440        Ok(task_id)
441    }
442
443    /// 派发一个指定任务唯一id的异步任务到异步运行时
444    fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
445        where
446            F: Future<Output = O> + Send + 'static {
447        if let Err(e) = (self.0).1.push(Arc::new(AsyncTask::new(
448            task_id,
449            (self.0).1.clone(),
450            DEFAULT_MAX_LOW_PRIORITY_BOUNDED,
451            Some(future.boxed()),
452        ))) {
453            return Err(Error::new(ErrorKind::Other, e));
454        }
455
456        Ok(())
457    }
458
459    /// 派发一个指定任务唯一id的异步任务到本地异步运行时,如果本地没有本异步运行时,则会派发到当前运行时中
460    fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
461        where
462            F: Future<Output = O> + Send + 'static {
463        (self.0).1.push_local(Arc::new(AsyncTask::new(
464            task_id,
465            (self.0).1.clone(),
466            DEFAULT_HIGH_PRIORITY_BOUNDED,
467            Some(future.boxed()))))
468    }
469
470    /// 派发一个指定任务唯一id和任务优先级的异步任务到异步运行时
471    fn spawn_priority_by_id<F>(&self,
472                               task_id: TaskId,
473                               priority: usize,
474                               future: F) -> Result<()>
475        where
476            F: Future<Output = O> + Send + 'static {
477        (self.0).1.push_priority(priority, Arc::new(AsyncTask::new(
478            task_id,
479            (self.0).1.clone(),
480            priority,
481            Some(future.boxed()))))
482    }
483
484    /// 派发一个指定任务唯一id的异步任务到异步运行时,并立即让出任务的当前运行
485    #[inline]
486    fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
487        where
488            F: Future<Output = O> + Send + 'static {
489        self.spawn_priority_by_id(task_id,
490                                  DEFAULT_HIGH_PRIORITY_BOUNDED,
491                                  future)
492    }
493
494    /// 派发一个指定任务唯一id和在指定时间后执行的异步任务到异步运行时,时间单位ms
495    fn spawn_timing_by_id<F>(&self,
496                             task_id: TaskId,
497                             future: F,
498                             time: usize) -> Result<()>
499        where
500            F: Future<Output = O> + Send + 'static {
501        let rt = self.clone();
502        self.spawn_by_id(task_id, async move {
503            (rt.0).3.set_timer(
504                AsyncTimingTask::WaitRun(Arc::new(AsyncTask::new(
505                    rt.alloc::<F::Output>(),
506                    (rt.0).1.clone(),
507                    DEFAULT_HIGH_PRIORITY_BOUNDED,
508                    Some(future.boxed()),
509                ))),
510                time,
511            );
512
513            (rt.0).4.fetch_add(1, Ordering::Relaxed);
514            Default::default()
515        })
516    }
517
518    /// 挂起指定唯一id的异步任务
519    fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
520        task_id.set_waker::<Output>(waker);
521        Poll::Pending
522    }
523
524    /// 唤醒指定唯一id的异步任务
525    fn wakeup<Output: 'static>(&self, task_id: &TaskId) {
526        task_id.wakeup::<Output>();
527    }
528
529    /// 挂起当前异步运行时的当前任务,并在指定的其它运行时上派发一个指定的异步任务,等待其它运行时上的异步任务完成后,唤醒当前运行时的当前任务,并返回其它运行时上的异步任务的值
530    fn wait<V: Send + 'static>(&self) -> AsyncWait<V> {
531        AsyncWait(self.wait_any(2))
532    }
533
534    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,其中任意一个任务完成,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成的任务的值将被忽略
535    fn wait_any<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
536        let (producor, consumer) = async_bounded(capacity);
537
538        AsyncWaitAny {
539            capacity,
540            producor,
541            consumer,
542        }
543    }
544
545    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,任务返回后需要通过用户指定的检查回调进行检查,其中任意一个任务检查通过,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成或未检查通过的任务的值将被忽略,如果所有任务都未检查通过,则强制唤醒当前运行时的当前任务
546    fn wait_any_callback<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
547        let (producor, consumer) = async_bounded(capacity);
548
549        AsyncWaitAnyCallback {
550            capacity,
551            producor,
552            consumer,
553        }
554    }
555
556    /// 构建用于派发多个异步任务到指定运行时的映射归并,需要指定映射归并的容量
557    fn map_reduce<V: Send + 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
558        let (producor, consumer) = async_bounded(capacity);
559
560        AsyncMapReduce {
561            count: 0,
562            capacity,
563            producor,
564            consumer,
565        }
566    }
567
568    /// 挂起当前异步运行时的当前任务,等待指定的时间后唤醒当前任务
569    fn timeout(&self, timeout: usize) -> BoxFuture<'static, ()> {
570        let rt = self.clone();
571        let producor = (self.0).2.clone();
572
573        AsyncWaitTimeout::new(rt, producor, timeout).boxed()
574    }
575
576    /// 立即让出当前任务的执行
577    fn yield_now(&self) -> BoxFuture<'static, ()> {
578        async move {
579            YieldNow(false).await;
580        }.boxed()
581    }
582
583    /// 生成一个异步管道,输入指定流,输入流的每个值通过过滤器生成输出流的值
584    fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> BoxStream<'static, FO>
585    where
586        S: Stream<Item = SO> + Send + 'static,
587        SO: Send + 'static,
588        F: FnMut(SO) -> AsyncPipelineResult<FO> + Send + 'static,
589        FO: Send + 'static,
590    {
591        let output = stream! {
592            for await value in input {
593                match filter(value) {
594                    AsyncPipelineResult::Disconnect => {
595                        //立即中止管道
596                        break;
597                    },
598                    AsyncPipelineResult::Filtered(result) => {
599                        yield result;
600                    },
601                }
602            }
603        };
604
605        output.boxed()
606    }
607
608    /// 关闭异步运行时,返回请求关闭是否成功
609    fn close(&self) -> bool {
610        false
611    }
612}
613
614impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>> AsyncRuntimeExt<O>
615    for SingleTaskRuntime<O, P>
616{
617    fn spawn_with_context<F, C>(&self, task_id: TaskId, future: F, context: C) -> Result<()>
618    where
619        F: Future<Output = O> + Send + 'static,
620        C: 'static,
621    {
622        if let Err(e) = (self.0).1.push(Arc::new(AsyncTask::with_context(
623            task_id,
624            (self.0).1.clone(),
625            DEFAULT_MAX_LOW_PRIORITY_BOUNDED,
626            Some(future.boxed()),
627            context,
628        ))) {
629            return Err(Error::new(ErrorKind::Other, e));
630        }
631
632        Ok(())
633    }
634
635    fn spawn_timing_with_context<F, C>(
636        &self,
637        task_id: TaskId,
638        future: F,
639        context: C,
640        time: usize,
641    ) -> Result<()>
642    where
643        F: Future<Output = O> + Send + 'static,
644        C: Send + 'static,
645    {
646        let rt = self.clone();
647        self.spawn_by_id(task_id, async move {
648            (rt.0).3.set_timer(
649                AsyncTimingTask::WaitRun(Arc::new(AsyncTask::with_context(
650                    rt.alloc::<F::Output>(),
651                    (rt.0).1.clone(),
652                    DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
653                    Some(future.boxed()),
654                    context,
655                ))),
656                time,
657            );
658
659            (rt.0).4.fetch_add(1, Ordering::Relaxed);
660            Default::default()
661        })
662    }
663
664    fn block_on<F>(&self, future: F) -> Result<F::Output>
665    where
666        F: Future + Send + 'static,
667        <F as Future>::Output: Default + Send + 'static,
668    {
669        let runner = SingleTaskRunner {
670            is_running: AtomicBool::new(true),
671            runtime: self.clone(),
672            clock: Clock::new(),
673        };
674        let mut result: Option<<F as Future>::Output> = None;
675        let result_raw = (&mut result) as *mut Option<<F as Future>::Output> as usize;
676
677        self.spawn(async move {
678            //在指定运行时中执行,并返回结果
679            let r = future.await;
680            unsafe {
681                *(result_raw as *mut Option<<F as Future>::Output>) = Some(r);
682            }
683
684            Default::default()
685        });
686
687        loop {
688            //执行异步任务
689            while runner.run()? > 0 {}
690
691            //尝试获取异步任务的执行结果
692            if let Some(result) = result.take() {
693                //异步任务已完成,则立即返回执行结果
694                return Ok(result);
695            }
696        }
697    }
698}
699
700impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>
701    SingleTaskRuntime<O, P>
702{
703    /// 获取当前单线程异步运行时的本地异步运行时
704    pub fn to_local_runtime(&self) -> LocalAsyncRuntime<O> {
705        LocalAsyncRuntime {
706            inner: self.as_raw(),
707            get_id_func: SingleTaskRuntime::<O, P>::get_id_raw,
708            spawn_func: SingleTaskRuntime::<O, P>::spawn_raw,
709            spawn_timing_func: SingleTaskRuntime::<O, P>::spawn_timing_raw,
710            timeout_func: SingleTaskRuntime::<O, P>::timeout_raw,
711        }
712    }
713
714    // 获取当前单线程异步运行时的指针
715    #[inline]
716    pub(crate) fn as_raw(&self) -> *const () {
717        Arc::into_raw(self.0.clone()) as *const ()
718    }
719
720    // 获取指定指针的单线程异步运行时
721    #[inline]
722    pub(crate) fn from_raw(raw: *const ()) -> Self {
723        let inner = unsafe {
724            Arc::from_raw(
725                raw as *const (
726                    usize,
727                    Arc<P>,
728                    Sender<(usize, AsyncTimingTask<P, O>)>,
729                    AsyncTaskTimer<P, O>,
730                    AtomicUsize,
731                    AtomicUsize,
732                ),
733            )
734        };
735        SingleTaskRuntime(inner)
736    }
737
738    // 获取当前异步运行时的唯一id
739    pub(crate) fn get_id_raw(raw: *const ()) -> usize {
740        let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
741        let id = rt.get_id();
742        Arc::into_raw(rt.0); //避免提前释放
743        id
744    }
745
746    // 派发一个指定的异步任务到异步运行时
747    pub(crate) fn spawn_raw(raw: *const (), future: BoxFuture<'static, O>) -> Result<()> {
748        let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
749        let result = rt.spawn_by_id(rt.alloc::<O>(), future);
750        Arc::into_raw(rt.0); //避免提前释放
751        result
752    }
753
754    // 定时派发一个指定的异步任务到异步运行时
755    pub(crate) fn spawn_timing_raw(
756        raw: *const (),
757        future: BoxFuture<'static, O>,
758        timeout: usize,
759    ) -> Result<()> {
760        let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
761        let result = rt.spawn_timing_by_id(rt.alloc::<O>(), future, timeout);
762        Arc::into_raw(rt.0); //避免提前释放
763        result
764    }
765
766    // 挂起当前异步运行时的当前任务,等待指定的时间后唤醒当前任务
767    pub(crate) fn timeout_raw(raw: *const (), timeout: usize) -> BoxFuture<'static, ()> {
768        let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
769        let boxed = rt.timeout(timeout);
770        Arc::into_raw(rt.0); //避免提前释放
771        boxed
772    }
773}
774
775///
776/// 单线程异步任务执行器
777///
778pub struct SingleTaskRunner<
779    O: Default + 'static,
780    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>,
781> {
782    is_running: AtomicBool,                 //是否开始运行
783    runtime:    SingleTaskRuntime<O, P>,    //异步单线程任务运行时
784    clock:      Clock,                      //执行器的时钟
785}
786
787unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Send
788    for SingleTaskRunner<O, P>
789{
790}
791unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Sync
792    for SingleTaskRunner<O, P>
793{
794}
795
796impl<O: Default + 'static> Default for SingleTaskRunner<O> {
797    fn default() -> Self {
798        SingleTaskRunner::new(SingleTaskPool::default())
799    }
800}
801
802impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>
803    SingleTaskRunner<O, P>
804{
805    /// 用指定的任务池构建单线程异步运行时
806    pub fn new(pool: P) -> Self {
807        let rt_uid = pool.get_thread_id() >> 32;
808        let pool = Arc::new(pool);
809
810        //构建本地定时器和定时异步任务生产者
811        let timer = AsyncTaskTimer::new();
812        let producor = timer.producor.clone();
813        let timer_producor_count = AtomicUsize::new(0);
814        let timer_consume_count = AtomicUsize::new(0);
815
816        //构建单线程任务运行时
817        let runtime = SingleTaskRuntime(Arc::new((rt_uid,
818                                                  pool,
819                                                  producor,
820                                                  timer,
821                                                  timer_producor_count,
822                                                  timer_consume_count)));
823
824        SingleTaskRunner {
825            is_running: AtomicBool::new(false),
826            runtime,
827            clock: Clock::new(),
828        }
829    }
830
831    /// 获取单线程异步任务执行器的线程唤醒器
832    pub fn get_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
833        (self.runtime.0).1.get_thread_waker().cloned()
834    }
835
836    /// 启动单线程异步任务执行器
837    pub fn startup(&self) -> Option<SingleTaskRuntime<O, P>> {
838        if cfg!(target_arch = "aarch64") {
839            match self
840                .is_running
841                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
842            {
843                Ok(false) => {
844                    //未启动,则启动,并返回单线程异步运行时
845                    Some(self.runtime.clone())
846                }
847                _ => {
848                    //已启动,则忽略
849                    None
850                }
851            }
852        } else {
853            match self.is_running.compare_exchange_weak(
854                false,
855                true,
856                Ordering::SeqCst,
857                Ordering::SeqCst,
858            ) {
859                Ok(false) => {
860                    //未启动,则启动,并返回单线程异步运行时
861                    Some(self.runtime.clone())
862                }
863                _ => {
864                    //已启动,则忽略
865                    None
866                }
867            }
868        }
869    }
870
871    /// 运行一次单线程异步任务执行器,返回当前任务池中任务的数量
872    pub fn run_once(&self) -> Result<usize> {
873        if !self.is_running.load(Ordering::Relaxed) {
874            //未启动,则返回错误原因
875            return Err(Error::new(
876                ErrorKind::Other,
877                "Single thread runtime not running",
878            ));
879        }
880
881        //设置新的定时任务,并唤醒已过期的定时任务
882        let mut pop_len = 0;
883        (self.runtime.0)
884            .4
885            .fetch_add((self.runtime.0).3.consume(),
886                       Ordering::Relaxed);
887        loop {
888            let current_time = (self.runtime.0).3.is_require_pop();
889            if let Some(current_time) = current_time {
890                //当前有到期的定时异步任务,则只处理到期的一个定时异步任务
891                let timed_out = (self.runtime.0).3.pop(current_time);
892                if let Some((handle, timing_task)) = timed_out {
893                    match timing_task {
894                        AsyncTimingTask::Pended(expired) => {
895                            //唤醒休眠的异步任务,并立即执行
896                            self.runtime.wakeup::<O>(&expired);
897                            if let Some(task) = (self.runtime.0).1.try_pop() {
898                                run_task(task);
899                            }
900                        }
901                        AsyncTimingTask::WaitRun(expired) => {
902                            //立即执行到期的定时异步任务,并立即执行
903                            (self.runtime.0).1.push_priority(handle, expired);
904                            if let Some(task) = (self.runtime.0).1.try_pop() {
905                                run_task(task);
906                            }
907                        }
908                    }
909                    pop_len += 1;
910                }
911            } else {
912                //当前没有到期的定时异步任务,则退出本次定时异步任务处理
913                break;
914            }
915        }
916        (self.runtime.0)
917            .5
918            .fetch_add(pop_len,
919                       Ordering::Relaxed);
920
921        //继续执行当前任务池中的一个异步任务
922        match (self.runtime.0).1.try_pop() {
923            None => {
924                //当前没有异步任务,则立即返回
925                return Ok(0);
926            }
927            Some(task) => {
928                run_task(task);
929            }
930        }
931
932        Ok((self.runtime.0).1.len())
933    }
934
935    /// 运行单线程异步任务执行器,并执行任务池中的所有任务
936    pub fn run(&self) -> Result<usize> {
937        if !self.is_running.load(Ordering::Relaxed) {
938            //未启动,则返回错误原因
939            return Err(Error::new(
940                ErrorKind::Other,
941                "Single thread runtime not running",
942            ));
943        }
944
945        loop {
946            //设置新的定时任务,并唤醒已过期的定时任务
947            let mut pop_len = 0;
948            let mut start_run_millis = self.clock.recent(); //重置开运行时长
949            (self.runtime.0)
950                .4
951                .fetch_add((self.runtime.0).3.consume(),
952                           Ordering::Relaxed);
953            loop {
954                let current_time = (self.runtime.0).3.is_require_pop();
955                if let Some(current_time) = current_time {
956                    //当前有到期的定时异步任务,则只处理到期的一个定时异步任务
957                    let timed_out = (self.runtime.0).3.pop(current_time);
958                    if let Some((handle, timing_task)) = timed_out {
959                        match timing_task {
960                            AsyncTimingTask::Pended(expired) => {
961                                //唤醒休眠的异步任务,并立即执行
962                                self.runtime.wakeup::<O>(&expired);
963                                if let Some(task) = (self.runtime.0).1.try_pop() {
964                                    run_task(task);
965                                }
966                            }
967                            AsyncTimingTask::WaitRun(expired) => {
968                                //立即执行到期的定时异步任务,并立即执行
969                                (self.runtime.0).1.push_priority(handle, expired);
970                                if let Some(task) = (self.runtime.0).1.try_pop() {
971                                    run_task(task);
972                                }
973                            }
974                        }
975                        pop_len += 1;
976                    }
977                } else {
978                    //当前没有到期的定时异步任务,则退出本次定时异步任务处理
979                    break;
980                }
981            }
982            (self.runtime.0)
983                .5
984                .fetch_add(pop_len,
985                           Ordering::Relaxed);
986
987            //继续执行当前任务池中的一个异步任务
988            while self
989                .clock
990                .recent()
991                .duration_since(start_run_millis)
992                .as_millis() < 1 {
993                match (self.runtime.0).1.try_pop() {
994                    None => {
995                        //当前没有异步任务,则立即返回
996                        return Ok((self.runtime.0).1.len());
997                    }
998                    Some(task) => {
999                        run_task(task);
1000                    }
1001                }
1002            }
1003        }
1004    }
1005
1006    /// 转换为本地异步单线程任务运行时
1007    pub fn into_local(self) -> SingleTaskRuntime<O, P> {
1008        self.runtime
1009    }
1010}
1011
1012//执行异步任务
1013#[inline]
1014fn run_task<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(
1015    task: Arc<AsyncTask<P, O>>,
1016) {
1017    let waker = waker_ref(&task);
1018    let mut context = Context::from_waker(&*waker);
1019    if let Some(mut future) = task.get_inner() {
1020        if let Poll::Pending = future.as_mut().poll(&mut context) {
1021            //当前未准备好,则恢复异步任务,以保证异步服务后续访问异步任务和异步任务不被提前释放
1022            task.set_inner(Some(future));
1023        }
1024    }
1025}