pi_async_rt/rt/
serial_single_thread.rs

1//! # 单线程运行时
2//!
3//! # 特征:
4//!
5//! - [SingleTaskPool]\: 单线程任务池
6//! - [SingleTaskRunner]\: 单线程异步任务执行器
7//! - [SingleTaskRuntime]\: 异步单线程任务运行时
8//!
9//! [SingleTaskPool]: struct.SingleTaskPool.html
10//! [SingleTaskRunner]: struct.SingleTaskRunner.html
11//! [SingleTaskRuntime]: struct.SingleTaskRuntime.html
12//!
13
14use std::thread;
15use std::sync::Arc;
16use std::vec::IntoIter;
17use std::future::Future;
18use std::cell::UnsafeCell;
19use std::io::{Error, ErrorKind, Result};
20use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
21use std::task::{Context, Poll, Waker};
22use std::collections::vec_deque::VecDeque;
23
24use async_stream::stream;
25use crossbeam_channel::Sender;
26use crossbeam_queue::SegQueue;
27use flume::bounded as async_bounded;
28use futures::{
29    future::{FutureExt, LocalBoxFuture},
30    stream::{LocalBoxStream, Stream, StreamExt},
31    task::waker_ref,
32};
33use parking_lot::{Condvar, Mutex};
34use quanta::Clock;
35
36use wrr::IWRRSelector;
37
38use crate::{
39    rt::{
40        PI_ASYNC_THREAD_LOCAL_ID, DEFAULT_MAX_HIGH_PRIORITY_BOUNDED, DEFAULT_HIGH_PRIORITY_BOUNDED, DEFAULT_MAX_LOW_PRIORITY_BOUNDED, alloc_rt_uid,
41        serial::{AsyncMapReduce, AsyncRuntime, AsyncRuntimeExt,
42                 AsyncTask, AsyncTaskPool, AsyncTaskPoolExt, AsyncTaskTimer, AsyncTimingTask, AsyncWait,
43                 AsyncWaitAny, AsyncWaitAnyCallback, AsyncWaitTimeout,
44                 LocalAsyncRuntime,
45        },
46        AsyncPipelineResult, TaskId, TaskHandle, YieldNow
47    },
48};
49
50///
51/// 单线程任务池
52///
53pub struct SingleTaskPool<O: Default + 'static> {
54    id:             usize,                                                      //绑定的线程唯一id
55    public:         SegQueue<Arc<AsyncTask<SingleTaskPool<O>, O>>>,             //外部任务队列
56    internal:       UnsafeCell<VecDeque<Arc<AsyncTask<SingleTaskPool<O>, O>>>>, //内部任务队列
57    stack:          UnsafeCell<Vec<Arc<AsyncTask<SingleTaskPool<O>, O>>>>,      //本地任务栈
58    selector:       UnsafeCell<IWRRSelector<2>>,                                //任务池选择器
59    consume_count:  AtomicUsize,                                                //任务消费计数
60    produce_count:  AtomicUsize,                                                //任务生产计数
61    thread_waker:   Option<Arc<(AtomicBool, Mutex<()>, Condvar)>>,              //绑定线程的唤醒器
62}
63
64unsafe impl<O: Default + 'static> Send for SingleTaskPool<O> {}
65unsafe impl<O: Default + 'static> Sync for SingleTaskPool<O> {}
66
67impl<O: Default + 'static> Default for SingleTaskPool<O> {
68    fn default() -> Self {
69        SingleTaskPool::new([1, 1])
70    }
71}
72
73impl<O: Default + 'static> AsyncTaskPool<O> for SingleTaskPool<O> {
74    type Pool = SingleTaskPool<O>;
75
76    #[inline]
77    fn get_thread_id(&self) -> usize {
78        let rt_uid = self.id;
79        match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
80            let current = unsafe { *thread_id.get() };
81            if current == usize::MAX {
82                //当前线程还未初始化运行时的线程id,则初始化
83                unsafe {
84                    *thread_id.get() = rt_uid << 32;
85                    *thread_id.get()
86                }
87            } else {
88                current
89            }
90        }) {
91            Err(e) => {
92                //不应该执行到这个分支
93                panic!(
94                    "Get thread id failed, thread: {:?}, reason: {:?}",
95                    thread::current(),
96                    e
97                );
98            }
99            Ok(id) => id,
100        }
101    }
102
103    #[inline]
104    fn len(&self) -> usize {
105        if let Some(len) = self
106            .produce_count
107            .load(Ordering::Relaxed)
108            .checked_sub(self.consume_count.load(Ordering::Relaxed))
109        {
110            len
111        } else {
112            0
113        }
114    }
115
116    #[inline]
117    fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
118        self.public.push(task);
119        self.produce_count.fetch_add(1, Ordering::Relaxed);
120        Ok(())
121    }
122
123    #[inline]
124    fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
125        let id = self.get_thread_id();
126        let rt_uid = task.owner();
127        if (id >> 32) == rt_uid {
128            //当前是运行时所在线程
129            unsafe {{
130                (&mut *self.internal.get()).push_back(task);
131            }}
132            self.produce_count.fetch_add(1, Ordering::Relaxed);
133            Ok(())
134        } else {
135            //当前不是运行时所在线程
136            self.push(task)
137        }
138    }
139
140    #[inline]
141    fn push_priority(&self,
142                     priority: usize,
143                     task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
144        if priority >= DEFAULT_MAX_HIGH_PRIORITY_BOUNDED {
145            //最高优先级
146            let id = self.get_thread_id();
147            let rt_uid = task.owner();
148            if (id >> 32) == rt_uid {
149                //当前是运行时所在线程
150                unsafe {
151                    let stack = (&mut *self.stack.get());
152                    if stack
153                        .capacity()
154                        .checked_sub(stack.len())
155                        .unwrap_or(0) >= 0 {
156                        //本地任务栈有空闲容量,则立即将任务加入本地任务栈
157                        (&mut *self.stack.get()).push(task);
158                    } else {
159                        //本地内部任务队列有空闲容量,则立即将任务加入本地内部任务队列
160                        (&mut *self.internal.get()).push_back(task);
161                    }
162                }
163
164                self.produce_count.fetch_add(1, Ordering::Relaxed);
165                Ok(())
166            } else {
167                //当前不是运行时所在线程
168                self.push(task)
169            }
170        } else if priority >= DEFAULT_HIGH_PRIORITY_BOUNDED {
171            //高优先级
172            self.push_local(task)
173        } else {
174            //低优先级
175            self.push(task)
176        }
177    }
178
179    #[inline]
180    fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
181        self.push_priority(DEFAULT_HIGH_PRIORITY_BOUNDED, task)
182    }
183
184    #[inline]
185    fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
186        let task = unsafe { (&mut *self
187            .stack
188            .get())
189            .pop()
190        };
191        if task.is_some() {
192            //指定工作者的任务栈有任务,则立即返回任务
193            self.consume_count.fetch_add(1, Ordering::Relaxed);
194            return task;
195        }
196
197        //从指定工作者的任务队列中弹出任务
198        let task = try_pop_by_weight(self);
199        if task.is_some() {
200            self
201                .consume_count
202                .fetch_add(1, Ordering::Relaxed);
203        }
204        task
205    }
206
207    #[inline]
208    fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
209        let mut all = Vec::with_capacity(self.len());
210
211        let internal = unsafe { (&mut *self.internal.get()) };
212        for _ in 0..internal.len() {
213            if let Some(task) = internal.pop_front() {
214                all.push(task);
215            }
216        }
217
218        let public_len = self.public.len();
219        for _ in 0..public_len {
220            if let Some(task) = self.public.pop() {
221                all.push(task);
222            }
223        }
224
225        all.into_iter()
226    }
227
228    #[inline]
229    fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
230        self.thread_waker.as_ref()
231    }
232}
233
234// 尝试通过统计信息更新权重,根据权重选择从本地外部任务队列或本地内部任务队列中弹出任务
235fn try_pop_by_weight<O: Default + 'static>(pool: &SingleTaskPool<O>)
236    -> Option<Arc<AsyncTask<SingleTaskPool<O>, O>>> {
237    unsafe {
238        //根据权重选择从指定的任务队列弹出任务
239        match (&mut *pool.selector.get()).select() {
240            0 => {
241                //弹出外部任务
242                let task = try_pop_external(pool);
243                if task.is_some() {
244                    task
245                } else {
246                    //当前没有外部任务,则尝试弹出内部任务
247                    try_pop_internal(pool)
248                }
249            },
250            _ => {
251                //弹出内部任务
252                let task = try_pop_internal(pool);
253                if task.is_some() {
254                    task
255                } else {
256                    //当前没有内部任务,则尝试弹出外部任务
257                    try_pop_external(pool)
258                }
259            },
260        }
261    }
262}
263
264// 尝试弹出内部任务队列的任务
265#[inline]
266fn try_pop_internal<O: Default + 'static>(pool: &SingleTaskPool<O>)
267    -> Option<Arc<AsyncTask<SingleTaskPool<O>, O>>> {
268    unsafe { (&mut *pool.internal.get()).pop_front() }
269}
270
271// 尝试弹出外部任务队列的任务
272#[inline]
273fn try_pop_external<O: Default + 'static>(pool: &SingleTaskPool<O>)
274    -> Option<Arc<AsyncTask<SingleTaskPool<O>, O>>> {
275    pool.public.pop()
276}
277
278impl<O: Default + 'static> AsyncTaskPoolExt<O> for SingleTaskPool<O> {
279    fn set_thread_waker(&mut self, thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) {
280        self.thread_waker = Some(thread_waker);
281    }
282}
283
284impl<O: Default + 'static> SingleTaskPool<O> {
285    /// 构建指定权重的单线程任务池
286    pub fn new(weights: [u8; 2]) -> Self {
287        let rt_uid = alloc_rt_uid();
288        let public = SegQueue::new();
289        let internal = UnsafeCell::new(VecDeque::new());
290        let stack = UnsafeCell::new(Vec::with_capacity(1));
291        let selector = UnsafeCell::new(IWRRSelector::new(weights));
292        let consume_count = AtomicUsize::new(0);
293        let produce_count = AtomicUsize::new(0);
294
295        SingleTaskPool {
296            id: (rt_uid << 8) & 0xffff | 1,
297            public,
298            internal,
299            stack,
300            selector,
301            consume_count,
302            produce_count,
303            thread_waker: Some(Arc::new((
304                AtomicBool::new(false),
305                Mutex::new(()),
306                Condvar::new(),
307            ))),
308        }
309    }
310}
311
312///
313/// 异步单线程任务运行时
314///
315pub struct SingleTaskRuntime<
316    O: Default + 'static = (),
317    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>,
318>(
319    Arc<(
320        usize,                                  //运行时唯一id
321        Arc<P>,                                 //异步任务池
322        Sender<(usize, AsyncTimingTask<P, O>)>, //休眠的异步任务生产者
323        AsyncTaskTimer<P, O>,                   //本地定时器
324        AtomicUsize,                            //定时器任务生产计数
325        AtomicUsize,                            //定时器任务消费计数
326    )>,
327);
328
329unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Send
330    for SingleTaskRuntime<O, P>
331{
332}
333unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Sync
334    for SingleTaskRuntime<O, P>
335{
336}
337
338impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Clone
339    for SingleTaskRuntime<O, P>
340{
341    fn clone(&self) -> Self {
342        SingleTaskRuntime(self.0.clone())
343    }
344}
345
346impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>> AsyncRuntime<O>
347    for SingleTaskRuntime<O, P>
348{
349    type Pool = P;
350
351    /// 共享运行时内部任务池
352    fn shared_pool(&self) -> Arc<Self::Pool> {
353        (self.0).1.clone()
354    }
355
356    /// 获取当前异步运行时的唯一id
357    fn get_id(&self) -> usize {
358        (self.0).0
359    }
360
361    /// 获取当前异步运行时待处理任务数量
362    fn wait_len(&self) -> usize {
363        (self.0)
364            .4
365            .load(Ordering::Relaxed)
366            .checked_sub((self.0).5.load(Ordering::Relaxed))
367            .unwrap_or(0)
368    }
369
370    /// 获取当前异步运行时任务数量
371    fn len(&self) -> usize {
372        (self.0).1.len()
373    }
374
375    /// 分配异步任务的唯一id
376    fn alloc<R: 'static>(&self) -> TaskId {
377        TaskId(UnsafeCell::new((TaskHandle::<R>::default().into_raw() as u128) << 64 | self.get_id() as u128 & 0xffffffffffffffff))
378    }
379
380    /// 派发一个指定的异步任务到异步运行时
381    fn spawn<F>(&self, future: F) -> Result<TaskId>
382        where
383            F: Future<Output = O> + 'static {
384        let task_id = self.alloc::<F::Output>();
385        if let Err(e) = self.spawn_by_id(task_id.clone(), future) {
386            return Err(e);
387        }
388
389        Ok(task_id)
390    }
391
392    /// 派发一个异步任务到本地异步运行时,如果本地没有本异步运行时,则会派发到当前运行时中
393    fn spawn_local<F>(&self, future: F) -> Result<TaskId>
394        where
395            F: Future<Output = O> + 'static {
396        let task_id = self.alloc::<F::Output>();
397        if let Err(e) = self.spawn_local_by_id(task_id.clone(), future) {
398            return Err(e);
399        }
400
401        Ok(task_id)
402    }
403
404    /// 派发一个指定优先级的异步任务到异步运行时
405    fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
406        where
407            F: Future<Output = O> + 'static {
408        let task_id = self.alloc::<F::Output>();
409        if let Err(e) = self.spawn_priority_by_id(task_id.clone(), priority, future) {
410            return Err(e);
411        }
412
413        Ok(task_id)
414    }
415
416    /// 派发一个异步任务到异步运行时,并立即让出任务的当前运行
417    fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
418        where
419            F: Future<Output = O> + 'static {
420        let task_id = self.alloc::<F::Output>();
421        if let Err(e) = self.spawn_yield_by_id(task_id.clone(), future) {
422            return Err(e);
423        }
424
425        Ok(task_id)
426    }
427
428    /// 派发一个在指定时间后执行的异步任务到异步运行时,时间单位ms
429    fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
430        where
431            F: Future<Output = O> + 'static {
432        let task_id = self.alloc::<F::Output>();
433        if let Err(e) = self.spawn_timing_by_id(task_id.clone(), future, time) {
434            return Err(e);
435        }
436
437        Ok(task_id)
438    }
439
440    /// 派发一个指定任务唯一id的异步任务到异步运行时
441    fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
442        where
443            F: Future<Output = O> + 'static {
444        if let Err(e) = (self.0).1.push(Arc::new(AsyncTask::new(
445            task_id,
446            (self.0).1.clone(),
447            DEFAULT_MAX_LOW_PRIORITY_BOUNDED,
448            Some(future.boxed_local()),
449        ))) {
450            return Err(Error::new(ErrorKind::Other, e));
451        }
452
453        Ok(())
454    }
455
456    /// 派发一个指定任务唯一id的异步任务到本地异步运行时,如果本地没有本异步运行时,则会派发到当前运行时中
457    fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
458        where
459            F: Future<Output = O> + 'static {
460        (self.0).1.push_local(Arc::new(AsyncTask::new(
461            task_id,
462            (self.0).1.clone(),
463            DEFAULT_HIGH_PRIORITY_BOUNDED,
464            Some(future.boxed_local()))))
465    }
466
467    /// 派发一个指定任务唯一id和任务优先级的异步任务到异步运行时
468    fn spawn_priority_by_id<F>(&self,
469                               task_id: TaskId,
470                               priority: usize,
471                               future: F) -> Result<()>
472        where
473            F: Future<Output = O> + 'static {
474        (self.0).1.push_priority(priority, Arc::new(AsyncTask::new(
475            task_id,
476            (self.0).1.clone(),
477            priority,
478            Some(future.boxed_local()))))
479    }
480
481    /// 派发一个指定任务唯一id的异步任务到异步运行时,并立即让出任务的当前运行
482    #[inline]
483    fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
484        where
485            F: Future<Output = O> + 'static {
486        self.spawn_priority_by_id(task_id,
487                                  DEFAULT_HIGH_PRIORITY_BOUNDED,
488                                  future)
489    }
490
491    /// 派发一个指定任务唯一id和在指定时间后执行的异步任务到异步运行时,时间单位ms
492    fn spawn_timing_by_id<F>(&self,
493                             task_id: TaskId,
494                             future: F,
495                             time: usize) -> Result<()>
496        where
497            F: Future<Output = O> + 'static {
498        let rt = self.clone();
499        self.spawn_by_id(task_id, async move {
500            (rt.0).3.set_timer(
501                AsyncTimingTask::WaitRun(Arc::new(AsyncTask::new(
502                    rt.alloc::<F::Output>(),
503                    (rt.0).1.clone(),
504                    DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
505                    Some(future.boxed_local()),
506                ))),
507                time,
508            );
509
510            (rt.0).4.fetch_add(1, Ordering::Relaxed);
511            Default::default()
512        })
513    }
514
515    /// 挂起指定唯一id的异步任务
516    fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
517        task_id.set_waker::<Output>(waker);
518        Poll::Pending
519    }
520
521    /// 唤醒指定唯一id的异步任务
522    fn wakeup<Output: 'static>(&self, task_id: &TaskId) {
523        task_id.wakeup::<Output>();
524    }
525
526    /// 挂起当前异步运行时的当前任务,并在指定的其它运行时上派发一个指定的异步任务,等待其它运行时上的异步任务完成后,唤醒当前运行时的当前任务,并返回其它运行时上的异步任务的值
527    fn wait<V: 'static>(&self) -> AsyncWait<V> {
528        AsyncWait::new(self.wait_any(2))
529    }
530
531    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,其中任意一个任务完成,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成的任务的值将被忽略
532    fn wait_any<V: 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
533        let (producor, consumer) = async_bounded(capacity);
534
535        AsyncWaitAny::new(capacity, producor, consumer)
536    }
537
538    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,任务返回后需要通过用户指定的检查回调进行检查,其中任意一个任务检查通过,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成或未检查通过的任务的值将被忽略,如果所有任务都未检查通过,则强制唤醒当前运行时的当前任务
539    fn wait_any_callback<V: 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
540        let (producor, consumer) = async_bounded(capacity);
541
542        AsyncWaitAnyCallback::new(capacity, producor, consumer)
543    }
544
545    /// 构建用于派发多个异步任务到指定运行时的映射归并,需要指定映射归并的容量
546    fn map_reduce<V: 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
547        let (producor, consumer) = async_bounded(capacity);
548
549        AsyncMapReduce::new(0, capacity, producor, consumer)
550    }
551
552    /// 挂起当前异步运行时的当前任务,等待指定的时间后唤醒当前任务
553    fn timeout(&self, timeout: usize) -> LocalBoxFuture<'static, ()> {
554        let rt = self.clone();
555        let producor = (self.0).2.clone();
556
557        AsyncWaitTimeout::new(rt, producor, timeout).boxed_local()
558    }
559
560    /// 立即让出当前任务的执行
561    fn yield_now(&self) -> LocalBoxFuture<'static, ()> {
562        async move {
563            YieldNow(false).await;
564        }.boxed_local()
565    }
566
567    /// 生成一个异步管道,输入指定流,输入流的每个值通过过滤器生成输出流的值
568    fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> LocalBoxStream<'static, FO>
569    where
570        S: Stream<Item = SO> + 'static,
571        SO: 'static,
572        F: FnMut(SO) -> AsyncPipelineResult<FO> + 'static,
573        FO: 'static,
574    {
575        let output = stream! {
576            for await value in input {
577                match filter(value) {
578                    AsyncPipelineResult::Disconnect => {
579                        //立即中止管道
580                        break;
581                    },
582                    AsyncPipelineResult::Filtered(result) => {
583                        yield result;
584                    },
585                }
586            }
587        };
588
589        output.boxed_local()
590    }
591
592    /// 关闭异步运行时,返回请求关闭是否成功
593    fn close(&self) -> bool {
594        false
595    }
596}
597
598impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>> AsyncRuntimeExt<O>
599    for SingleTaskRuntime<O, P>
600{
601    fn spawn_with_context<F, C>(&self, task_id: TaskId, future: F, context: C) -> Result<()>
602    where
603        F: Future<Output = O> + 'static,
604        C: 'static,
605    {
606        if let Err(e) = (self.0).1.push(Arc::new(AsyncTask::with_context(
607            task_id,
608            (self.0).1.clone(),
609            DEFAULT_MAX_LOW_PRIORITY_BOUNDED,
610            Some(future.boxed_local()),
611            context,
612        ))) {
613            return Err(Error::new(ErrorKind::Other, e));
614        }
615
616        Ok(())
617    }
618
619    fn spawn_timing_with_context<F, C>(
620        &self,
621        task_id: TaskId,
622        future: F,
623        context: C,
624        time: usize,
625    ) -> Result<()>
626    where
627        F: Future<Output = O> + 'static,
628        C: 'static,
629    {
630        let rt = self.clone();
631        self.spawn_by_id(task_id, async move {
632            (rt.0).3.set_timer(
633                AsyncTimingTask::WaitRun(Arc::new(AsyncTask::with_context(
634                    rt.alloc::<F::Output>(),
635                    (rt.0).1.clone(),
636                    DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
637                    Some(future.boxed_local()),
638                    context,
639                ))),
640                time,
641            );
642
643            (rt.0).4.fetch_add(1, Ordering::Relaxed);
644            Default::default()
645        })
646    }
647
648    fn block_on<F>(&self, future: F) -> Result<F::Output>
649    where
650        F: Future + 'static,
651        <F as Future>::Output: Default + 'static {
652        let runner = SingleTaskRunner {
653            is_running: AtomicBool::new(true),
654            runtime: self.clone(),
655            clock: Clock::new(),
656        };
657        let mut result: Option<<F as Future>::Output> = None;
658        let result_raw = (&mut result) as *mut Option<<F as Future>::Output> as usize;
659
660        self.spawn(async move {
661            //在指定运行时中执行,并返回结果
662            let r = future.await;
663            unsafe {
664                *(result_raw as *mut Option<<F as Future>::Output>) = Some(r);
665            }
666
667            Default::default()
668        });
669
670        loop {
671            //执行异步任务
672            while runner.run()? > 0 {}
673
674            //尝试获取异步任务的执行结果
675            if let Some(result) = result.take() {
676                //异步任务已完成,则立即返回执行结果
677                return Ok(result);
678            }
679        }
680    }
681}
682
683impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>
684    SingleTaskRuntime<O, P>
685{
686    /// 获取当前单线程异步运行时的本地异步运行时
687    pub fn to_local_runtime(&self) -> LocalAsyncRuntime<O> {
688        LocalAsyncRuntime::new(
689            self.as_raw(),
690            SingleTaskRuntime::<O, P>::get_id_raw,
691            SingleTaskRuntime::<O, P>::spawn_raw,
692            SingleTaskRuntime::<O, P>::spawn_timing_raw,
693            SingleTaskRuntime::<O, P>::timeout_raw,
694        )
695    }
696
697    // 获取当前单线程异步运行时的指针
698    #[inline]
699    pub(crate) fn as_raw(&self) -> *const () {
700        Arc::into_raw(self.0.clone()) as *const ()
701    }
702
703    // 获取指定指针的单线程异步运行时
704    #[inline]
705    pub(crate) fn from_raw(raw: *const ()) -> Self {
706        let inner = unsafe {
707            Arc::from_raw(
708                raw as *const (
709                    usize,
710                    Arc<P>,
711                    Sender<(usize, AsyncTimingTask<P, O>)>,
712                    AsyncTaskTimer<P, O>,
713                    AtomicUsize,
714                    AtomicUsize,
715                ),
716            )
717        };
718        SingleTaskRuntime(inner)
719    }
720
721    // 获取当前异步运行时的唯一id
722    pub(crate) fn get_id_raw(raw: *const ()) -> usize {
723        let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
724        let id = rt.get_id();
725        Arc::into_raw(rt.0); //避免提前释放
726        id
727    }
728
729    // 派发一个指定的异步任务到异步运行时
730    pub(crate) fn spawn_raw(raw: *const (), future: LocalBoxFuture<'static, O>) -> Result<()> {
731        let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
732        let result = rt.spawn_by_id(rt.alloc::<O>(), future);
733        Arc::into_raw(rt.0); //避免提前释放
734        result
735    }
736
737    // 定时派发一个指定的异步任务到异步运行时
738    pub(crate) fn spawn_timing_raw(
739        raw: *const (),
740        future: LocalBoxFuture<'static, O>,
741        timeout: usize,
742    ) -> Result<()> {
743        let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
744        let result = rt.spawn_timing_by_id(rt.alloc::<O>(), future, timeout);
745        Arc::into_raw(rt.0); //避免提前释放
746        result
747    }
748
749    // 挂起当前异步运行时的当前任务,等待指定的时间后唤醒当前任务
750    pub(crate) fn timeout_raw(raw: *const (), timeout: usize) -> LocalBoxFuture<'static, ()> {
751        let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
752        let boxed = rt.timeout(timeout);
753        Arc::into_raw(rt.0); //避免提前释放
754        boxed
755    }
756}
757
758///
759/// 单线程异步任务执行器
760///
761pub struct SingleTaskRunner<
762    O: Default + 'static,
763    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>,
764> {
765    is_running: AtomicBool,                 //是否开始运行
766    runtime:    SingleTaskRuntime<O, P>,    //异步单线程任务运行时
767    clock:      Clock,                      //执行器的时钟
768}
769
770unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Send
771    for SingleTaskRunner<O, P>
772{
773}
774unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Sync
775    for SingleTaskRunner<O, P>
776{
777}
778
779impl<O: Default + 'static> Default for SingleTaskRunner<O> {
780    fn default() -> Self {
781        SingleTaskRunner::new(SingleTaskPool::default())
782    }
783}
784
785impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>
786    SingleTaskRunner<O, P>
787{
788    /// 用指定的任务池构建单线程异步运行时
789    pub fn new(pool: P) -> Self {
790        let rt_uid = pool.get_thread_id() >> 32;
791        let pool = Arc::new(pool);
792
793        //构建本地定时器和定时异步任务生产者
794        let timer = AsyncTaskTimer::new();
795        let producor = timer.get_producor().clone();
796        let timer_producor_count = AtomicUsize::new(0);
797        let timer_consume_count = AtomicUsize::new(0);
798
799        //构建单线程任务运行时
800        let runtime = SingleTaskRuntime(Arc::new((rt_uid,
801                                                  pool,
802                                                  producor,
803                                                  timer,
804                                                  timer_producor_count,
805                                                  timer_consume_count)));
806
807        SingleTaskRunner {
808            is_running: AtomicBool::new(false),
809            runtime,
810            clock: Clock::new(),
811        }
812    }
813
814    /// 获取单线程异步任务执行器的线程唤醒器
815    pub fn get_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
816        (self.runtime.0).1.get_thread_waker().cloned()
817    }
818
819    /// 启动单线程异步任务执行器
820    pub fn startup(&self) -> Option<SingleTaskRuntime<O, P>> {
821        if cfg!(target_arch = "aarch64") {
822            match self
823                .is_running
824                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
825            {
826                Ok(false) => {
827                    //未启动,则启动,并返回单线程异步运行时
828                    Some(self.runtime.clone())
829                }
830                _ => {
831                    //已启动,则忽略
832                    None
833                }
834            }
835        } else {
836            match self.is_running.compare_exchange_weak(
837                false,
838                true,
839                Ordering::SeqCst,
840                Ordering::SeqCst,
841            ) {
842                Ok(false) => {
843                    //未启动,则启动,并返回单线程异步运行时
844                    Some(self.runtime.clone())
845                }
846                _ => {
847                    //已启动,则忽略
848                    None
849                }
850            }
851        }
852    }
853
854    /// 运行一次单线程异步任务执行器,返回当前任务池中任务的数量
855    pub fn run_once(&self) -> Result<usize> {
856        if !self.is_running.load(Ordering::Relaxed) {
857            //未启动,则返回错误原因
858            return Err(Error::new(
859                ErrorKind::Other,
860                "Single thread runtime not running",
861            ));
862        }
863
864        //设置新的定时任务,并唤醒已过期的定时任务
865        let mut pop_len = 0;
866        (self.runtime.0)
867            .4
868            .fetch_add((self.runtime.0).3.consume(),
869                       Ordering::Relaxed);
870        loop {
871            let current_time = (self.runtime.0).3.is_require_pop();
872            if let Some(current_time) = current_time {
873                //当前有到期的定时异步任务,则只处理到期的一个定时异步任务
874                let timed_out = (self.runtime.0).3.pop(current_time);
875                if let Some((_handle, timing_task)) = timed_out {
876                    match timing_task {
877                        AsyncTimingTask::Pended(expired) => {
878                            //唤醒休眠的异步任务,并立即执行
879                            self.runtime.wakeup::<O>(&expired);
880                            if let Some(task) = (self.runtime.0).1.try_pop() {
881                                run_task(task);
882                            }
883                        }
884                        AsyncTimingTask::WaitRun(expired) => {
885                            //立即执行到期的定时异步任务,并立即执行
886                            (self.runtime.0).1.push_priority(DEFAULT_MAX_HIGH_PRIORITY_BOUNDED, expired);
887                            if let Some(task) = (self.runtime.0).1.try_pop() {
888                                run_task(task);
889                            }
890                        }
891                    }
892                    pop_len += 1;
893                }
894            } else {
895                //当前没有到期的定时异步任务,则退出本次定时异步任务处理
896                break;
897            }
898        }
899        (self.runtime.0)
900            .5
901            .fetch_add(pop_len,
902                       Ordering::Relaxed);
903
904        //继续执行当前任务池中的一个异步任务
905        match (self.runtime.0).1.try_pop() {
906            None => {
907                //当前没有异步任务,则立即返回
908                return Ok(0);
909            }
910            Some(task) => {
911                run_task(task);
912            }
913        }
914
915        Ok((self.runtime.0).1.len())
916    }
917
918    /// 运行单线程异步任务执行器,并执行任务池中的所有任务
919    pub fn run(&self) -> Result<usize> {
920        if !self.is_running.load(Ordering::Relaxed) {
921            //未启动,则返回错误原因
922            return Err(Error::new(
923                ErrorKind::Other,
924                "Single thread runtime not running",
925            ));
926        }
927
928        loop {
929            //设置新的定时任务,并唤醒已过期的定时任务
930            let mut pop_len = 0;
931            let mut start_run_millis = self.clock.recent(); //重置开运行时长
932            (self.runtime.0)
933                .4
934                .fetch_add((self.runtime.0).3.consume(),
935                           Ordering::Relaxed);
936            loop {
937                let current_time = (self.runtime.0).3.is_require_pop();
938                if let Some(current_time) = current_time {
939                    //当前有到期的定时异步任务,则只处理到期的一个定时异步任务
940                    let timed_out = (self.runtime.0).3.pop(current_time);
941                    if let Some((handle, timing_task)) = timed_out {
942                        match timing_task {
943                            AsyncTimingTask::Pended(expired) => {
944                                //唤醒休眠的异步任务,并立即执行
945                                self.runtime.wakeup::<O>(&expired);
946                                if let Some(task) = (self.runtime.0).1.try_pop() {
947                                    run_task(task);
948                                }
949                            }
950                            AsyncTimingTask::WaitRun(expired) => {
951                                //立即执行到期的定时异步任务,并立即执行
952                                (self.runtime.0).1.push_priority(handle, expired);
953                                if let Some(task) = (self.runtime.0).1.try_pop() {
954                                    run_task(task);
955                                }
956                            }
957                        }
958                        pop_len += 1;
959                    }
960                } else {
961                    //当前没有到期的定时异步任务,则退出本次定时异步任务处理
962                    break;
963                }
964            }
965            (self.runtime.0)
966                .5
967                .fetch_add(pop_len,
968                           Ordering::Relaxed);
969
970            //继续执行当前任务池中的一个异步任务
971            while self
972                .clock
973                .recent()
974                .duration_since(start_run_millis)
975                .as_millis() < 1 {
976                match (self.runtime.0).1.try_pop() {
977                    None => {
978                        //当前没有异步任务,则立即返回
979                        return Ok((self.runtime.0).1.len());
980                    }
981                    Some(task) => {
982                        run_task(task);
983                    }
984                }
985            }
986        }
987    }
988
989    /// 转换为本地异步单线程任务运行时
990    pub fn into_local(self) -> SingleTaskRuntime<O, P> {
991        self.runtime
992    }
993}
994
995//执行异步任务
996#[inline]
997fn run_task<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(
998    task: Arc<AsyncTask<P, O>>,
999) {
1000    let waker = waker_ref(&task);
1001    let mut context = Context::from_waker(&*waker);
1002    if let Some(mut future) = task.get_inner() {
1003        if let Poll::Pending = future.as_mut().poll(&mut context) {
1004            //当前未准备好,则恢复异步任务,以保证异步服务后续访问异步任务和异步任务不被提前释放
1005            task.set_inner(Some(future));
1006        }
1007    }
1008}