pi_async_rt/rt/
serial_worker_thread.rs

1use std::future::Future;
2use std::time::Duration;
3use std::task::{Poll, Waker};
4use std::io::{Error, Result, ErrorKind};
5use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
6
7use parking_lot::{Mutex, Condvar};
8use futures::{future::LocalBoxFuture,
9              stream::{Stream, LocalBoxStream}};
10
11use crate::rt::{TaskId, AsyncPipelineResult,
12                serial::{AsyncRuntime,
13                         AsyncRuntimeExt,
14                         AsyncTaskPool,
15                         AsyncTaskPoolExt,
16                         AsyncWait,
17                         AsyncWaitAny,
18                         AsyncWaitAnyCallback,
19                         AsyncMapReduce,
20                         LocalAsyncRuntime,
21                         spawn_worker_thread, wakeup_worker_thread},
22                serial_single_thread::{SingleTaskPool, SingleTaskRunner, SingleTaskRuntime}};
23
24///
25/// 工作者异步运行时
26///
27pub struct WorkerRuntime<
28    O: Default + 'static = (),
29    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>,
30>(Arc<(
31    Arc<AtomicBool>,                        //工作者状态
32    Arc<(AtomicBool, Mutex<()>, Condvar)>,  //工作者线程唤醒器
33    SingleTaskRuntime<O, P>                 //单线程运行时
34)>);
35
36unsafe impl<
37    O: Default + 'static,
38    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
39> Send for WorkerRuntime<O, P> {}
40unsafe impl<
41    O: Default + 'static,
42    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
43> Sync for WorkerRuntime<O, P> {}
44
45impl<
46    O: Default + 'static,
47    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
48> Clone for WorkerRuntime<O, P> {
49    fn clone(&self) -> Self {
50        WorkerRuntime(self.0.clone())
51    }
52}
53
54impl<
55    O: Default + 'static,
56    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
57> AsyncRuntime<O> for WorkerRuntime<O, P> {
58    type Pool = P;
59
60    /// 共享运行时内部任务池
61    #[inline]
62    fn shared_pool(&self) -> Arc<Self::Pool> {
63        (self.0).2.shared_pool()
64    }
65
66    /// 获取当前异步运行时的唯一id
67    #[inline]
68    fn get_id(&self) -> usize {
69        (self.0).2.get_id()
70    }
71
72    /// 获取当前异步运行时待处理任务数量
73    #[inline]
74    fn wait_len(&self) -> usize {
75        (self.0).2.wait_len()
76    }
77
78    /// 获取当前异步运行时任务数量
79    #[inline]
80    fn len(&self) -> usize {
81        (self.0).2.len()
82    }
83
84    /// 分配异步任务的唯一id
85    #[inline]
86    fn alloc<R: 'static>(&self) -> TaskId {
87        (self.0).2.alloc::<R>()
88    }
89
90    /// 派发一个指定的异步任务到异步运行时
91    fn spawn<F>(&self, future: F) -> Result<TaskId>
92        where F: Future<Output = O> + 'static {
93        if !(self.0).0.load(Ordering::SeqCst) {
94            return Err(Error::new(ErrorKind::Other, "Spawn async task failed, reason: worker already closed"));
95        }
96
97        let result = (self.0).2.spawn(future);
98        wakeup_worker_thread(&(self.0).1, &(self.0).2);
99        result
100    }
101
102    /// 派发一个异步任务到本地异步运行时,如果本地没有本异步运行时,则会派发到当前运行时中
103    fn spawn_local<F>(&self, future: F) -> Result<TaskId>
104        where
105            F: Future<Output = O> + 'static {
106        if !(self.0).0.load(Ordering::SeqCst) {
107            return Err(Error::new(ErrorKind::Other, "Spawn local async task failed, reason: worker already closed"));
108        }
109
110        let result = (self.0).2.spawn_local(future);
111        wakeup_worker_thread(&(self.0).1, &(self.0).2);
112        result
113    }
114
115    /// 派发一个指定优先级的异步任务到异步运行时
116    fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
117        where
118            F: Future<Output = O> + 'static {
119        if !(self.0).0.load(Ordering::SeqCst) {
120            return Err(Error::new(ErrorKind::Other, "Spawn priority async task failed, reason: worker already closed"));
121        }
122
123        let result = (self.0).2.spawn_priority(priority, future);
124        wakeup_worker_thread(&(self.0).1, &(self.0).2);
125        result
126    }
127
128    /// 派发一个异步任务到异步运行时,并立即让出任务的当前运行
129    fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
130        where
131            F: Future<Output = O> + 'static {
132        if !(self.0).0.load(Ordering::SeqCst) {
133            return Err(Error::new(ErrorKind::Other, "Spawn yield priority async task failed, reason: worker already closed"));
134        }
135
136        let result = (self.0).2.spawn_yield(future);
137        wakeup_worker_thread(&(self.0).1, &(self.0).2);
138        result
139    }
140
141    /// 派发一个在指定时间后执行的异步任务到异步运行时,时间单位ms
142    fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
143        where F: Future<Output = O> + 'static {
144        if !(self.0).0.load(Ordering::SeqCst) {
145            return Err(Error::new(ErrorKind::Other, "Spawn timing async task failed, reason: worker already closed"));
146        }
147
148        let result = (self.0).2.spawn_timing(future, time);
149        wakeup_worker_thread(&(self.0).1, &(self.0).2);
150        result
151    }
152
153    /// 派发一个指定任务唯一id的异步任务到异步运行时
154    fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
155        where
156            F: Future<Output=O> + 'static {
157        if !(self.0).0.load(Ordering::SeqCst) {
158            return Err(Error::new(ErrorKind::Other, "Spawn async task by id failed, reason: worker already closed"));
159        }
160
161        let result = (self.0).2.spawn_by_id(task_id, future);
162        wakeup_worker_thread(&(self.0).1, &(self.0).2);
163        result
164    }
165
166    /// 派发一个指定任务唯一id的异步任务到本地异步运行时,如果本地没有本异步运行时,则会派发到当前运行时中
167    fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
168        where
169            F: Future<Output=O> + 'static {
170        if !(self.0).0.load(Ordering::SeqCst) {
171            return Err(Error::new(ErrorKind::Other, "Spawn local async task by id failed, reason: worker already closed"));
172        }
173
174        let result = (self.0).2.spawn_local_by_id(task_id, future);
175        wakeup_worker_thread(&(self.0).1, &(self.0).2);
176        result
177    }
178
179    /// 派发一个指定任务唯一id和任务优先级的异步任务到异步运行时
180    fn spawn_priority_by_id<F>(&self,
181                               task_id: TaskId,
182                               priority: usize,
183                               future: F) -> Result<()>
184        where
185            F: Future<Output=O> + 'static {
186        if !(self.0).0.load(Ordering::SeqCst) {
187            return Err(Error::new(ErrorKind::Other, "Spawn priority async task by id failed, reason: worker already closed"));
188        }
189
190        let result = (self.0).2.spawn_priority_by_id(task_id, priority, future);
191        wakeup_worker_thread(&(self.0).1, &(self.0).2);
192        result
193    }
194
195    /// 派发一个指定任务唯一id的异步任务到异步运行时,并立即让出任务的当前运行
196    fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
197        where
198            F: Future<Output=O> + 'static {
199        if !(self.0).0.load(Ordering::SeqCst) {
200            return Err(Error::new(ErrorKind::Other, "Spawn yield async task by id failed, reason: worker already closed"));
201        }
202
203        let result = (self.0).2.spawn_yield_by_id(task_id, future);
204        wakeup_worker_thread(&(self.0).1, &(self.0).2);
205        result
206    }
207
208    /// 派发一个指定任务唯一id和在指定时间后执行的异步任务到异步运行时,时间单位ms
209    fn spawn_timing_by_id<F>(&self,
210                             task_id: TaskId,
211                             future: F,
212                             time: usize) -> Result<()>
213        where
214            F: Future<Output=O> + 'static {
215        if !(self.0).0.load(Ordering::SeqCst) {
216            return Err(Error::new(ErrorKind::Other, "Spawn timing async task by id failed, reason: worker already closed"));
217        }
218
219        let result = (self.0).2.spawn_timing_by_id(task_id, future, time);
220        wakeup_worker_thread(&(self.0).1, &(self.0).2);
221        result
222    }
223
224    /// 挂起指定唯一id的异步任务
225    #[inline]
226    fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
227        (self.0).2.pending::<Output>(task_id, waker)
228    }
229
230    /// 唤醒指定唯一id的异步任务
231    #[inline]
232    fn wakeup<Output: 'static>(&self, task_id: &TaskId) {
233        (self.0).2.wakeup::<Output>(task_id);
234    }
235
236    /// 挂起当前异步运行时的当前任务,并在指定的其它运行时上派发一个指定的异步任务,等待其它运行时上的异步任务完成后,唤醒当前运行时的当前任务,并返回其它运行时上的异步任务的值
237    #[inline]
238    fn wait<V: 'static>(&self) -> AsyncWait<V> {
239        (self.0).2.wait()
240    }
241
242    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,其中任意一个任务完成,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成的任务的值将被忽略
243    #[inline]
244    fn wait_any<V: 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
245        (self.0).2.wait_any(capacity)
246    }
247
248    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,任务返回后需要通过用户指定的检查回调进行检查,其中任意一个任务检查通过,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成或未检查通过的任务的值将被忽略,如果所有任务都未检查通过,则强制唤醒当前运行时的当前任务
249    #[inline]
250    fn wait_any_callback<V: 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
251        (self.0).2.wait_any_callback(capacity)
252    }
253
254    /// 构建用于派发多个异步任务到指定运行时的映射归并,需要指定映射归并的容量
255    #[inline]
256    fn map_reduce<V: 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
257        (self.0).2.map_reduce(capacity)
258    }
259
260    /// 挂起当前异步运行时的当前任务,等待指定的时间后唤醒当前任务
261    #[inline]
262    fn timeout(&self, timeout: usize) -> LocalBoxFuture<'static, ()> {
263        (self.0).2.timeout(timeout)
264    }
265
266    /// 立即让出当前任务的执行
267    #[inline]
268    fn yield_now(&self) -> LocalBoxFuture<'static, ()> {
269        (self.0).2.yield_now()
270    }
271
272    /// 生成一个异步管道,输入指定流,输入流的每个值通过过滤器生成输出流的值
273    #[inline]
274    fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> LocalBoxStream<'static, FO>
275        where S: Stream<Item = SO> + 'static,
276              SO: 'static,
277              F: FnMut(SO) -> AsyncPipelineResult<FO> + 'static,
278              FO: 'static {
279        (self.0).2.pipeline(input, filter)
280    }
281
282    /// 关闭异步运行时,返回请求关闭是否成功
283    fn close(&self) -> bool {
284        if cfg!(target_arch = "aarch64") {
285            if let Ok(true) = (self.0).0.compare_exchange(true,
286                                                          false,
287                                                          Ordering::SeqCst,
288                                                          Ordering::SeqCst) {
289                //设置工作者状态成功,检查运行时所在线程是否需要唤醒
290                wakeup_worker_thread(&(self.0).1, &(self.0).2);
291                true
292            } else {
293                false
294            }
295        } else {
296            if let Ok(true) = (self.0).0.compare_exchange_weak(true,
297                                                               false,
298                                                               Ordering::SeqCst,
299                                                               Ordering::SeqCst) {
300                //设置工作者状态成功,检查运行时所在线程是否需要唤醒
301                wakeup_worker_thread(&(self.0).1, &(self.0).2);
302                true
303            } else {
304                false
305            }
306        }
307    }
308}
309
310impl<
311    O: Default + 'static,
312    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
313> AsyncRuntimeExt<O> for WorkerRuntime<O, P> {
314    #[inline]
315    fn spawn_with_context<F, C>(&self,
316                                task_id: TaskId,
317                                future: F,
318                                context: C) -> Result<()>
319        where F: Future<Output = O> + 'static,
320              C: 'static {
321        (self.0).2.spawn_with_context(task_id, future, context)
322    }
323
324    #[inline]
325    fn spawn_timing_with_context<F, C>(&self,
326                                       task_id: TaskId,
327                                       future: F,
328                                       context: C,
329                                       time: usize) -> Result<()>
330        where F: Future<Output = O> + 'static,
331              C: 'static {
332        (self.0).2.spawn_timing_with_context(task_id, future, context, time)
333    }
334
335    #[inline]
336    fn block_on<F>(&self, future: F) -> Result<F::Output>
337        where F: Future + 'static,
338              <F as Future>::Output: Default + 'static {
339        (self.0).2.block_on::<F>(future)
340    }
341}
342
343impl<
344    O: Default + 'static,
345    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
346> WorkerRuntime<O, P> {
347    /// 获取工作者异步运行时的工作者状态
348    pub fn get_worker_status(&self) -> &Arc<AtomicBool> {
349        &(self.0).0
350    }
351
352    /// 获取工作者异步运行时的工作者线程状态
353    pub fn get_worker_waker(&self) -> &Arc<(AtomicBool, Mutex<()>, Condvar)> {
354        &(self.0).1
355    }
356
357    /// 获取工作者异步运行时的单线程异步运行时
358    pub fn get_worker_runtime(&self) -> &SingleTaskRuntime<O, P> {
359        &(self.0).2
360    }
361
362    /// 获取当前工作者异步运行时的本地异步运行时
363    pub fn to_local_runtime(&self) -> LocalAsyncRuntime<O> {
364        LocalAsyncRuntime::new(
365            self.as_raw(),
366            WorkerRuntime::<O, P>::get_id_raw,
367            WorkerRuntime::<O, P>::spawn_raw,
368            WorkerRuntime::<O, P>::spawn_timing_raw,
369            WorkerRuntime::<O, P>::timeout_raw
370        )
371    }
372
373    // 获取当前工作者异步运行时的指针
374    #[inline]
375    pub(crate) fn as_raw(&self) -> *const () {
376        Arc::into_raw(self.0.clone()) as *const ()
377    }
378
379    // 获取指定指针的工作者异步运行时
380    #[inline]
381    pub(crate) fn from_raw(raw: *const ()) -> Self {
382        let inner = unsafe {
383            Arc::from_raw(raw as *const (
384                Arc<AtomicBool>,
385                Arc<(AtomicBool, Mutex<()>, Condvar)>,
386                SingleTaskRuntime<O, P>),
387            )
388        };
389        WorkerRuntime(inner)
390    }
391
392    // 获取当前异步运行时的唯一id
393    pub(crate) fn get_id_raw(raw: *const ()) -> usize {
394        let rt = WorkerRuntime::<O, P>::from_raw(raw);
395        let id = rt.get_id();
396        Arc::into_raw(rt.0); //避免提前释放
397        id
398    }
399
400    // 派发一个指定的异步任务到异步运行时
401    pub(crate) fn spawn_raw(raw: *const (),
402                            future: LocalBoxFuture<'static, O>) -> Result<()> {
403        let rt = WorkerRuntime::<O, P>::from_raw(raw);
404        let result = rt.spawn_by_id(rt.alloc::<O>(), future);
405        Arc::into_raw(rt.0); //避免提前释放
406        result
407    }
408
409    // 定时派发一个指定的异步任务到异步运行时
410    pub(crate) fn spawn_timing_raw(raw: *const (),
411                                   future: LocalBoxFuture<'static, O>,
412                                   timeout: usize) -> Result<()> {
413        let rt = WorkerRuntime::<O, P>::from_raw(raw);
414        let result = rt.spawn_timing_by_id(rt.alloc::<O>(), future, timeout);
415        Arc::into_raw(rt.0); //避免提前释放
416        result
417    }
418
419    // 挂起当前异步运行时的当前任务,等待指定的时间后唤醒当前任务
420    pub(crate) fn timeout_raw(raw: *const (),
421                              timeout: usize) -> LocalBoxFuture<'static, ()> {
422        let rt = WorkerRuntime::<O, P>::from_raw(raw);
423        let boxed = rt.timeout(timeout);
424        Arc::into_raw(rt.0); //避免提前释放
425        boxed
426    }
427}
428
429///
430/// 工作者任务执行器
431///
432pub struct WorkerTaskRunner<
433    O: Default + 'static = (),
434    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P> = SingleTaskPool<O>,
435>(Arc<(
436    Arc<AtomicBool>,                        //工作者状态
437    Arc<(AtomicBool, Mutex<()>, Condvar)>,  //工作者线程唤醒器
438    SingleTaskRunner<O, P>,                 //单线程异步任务执行器
439    WorkerRuntime<O, P>,                    //工作者运行时
440)>);
441
442unsafe impl<
443    O: Default + 'static,
444    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
445> Send for WorkerTaskRunner<O, P> {}
446unsafe impl<
447    O: Default + 'static,
448    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
449> Sync for WorkerTaskRunner<O, P> {}
450
451impl<
452    O: Default + 'static,
453    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
454> Clone for WorkerTaskRunner<O, P> {
455    fn clone(&self) -> Self {
456        WorkerTaskRunner(self.0.clone())
457    }
458}
459
460impl<
461    O: Default + 'static,
462    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
463> From<(Arc<AtomicBool>, Arc<(AtomicBool, Mutex<()>, Condvar)>, SingleTaskRuntime<O, P>)> for WorkerRuntime<O, P> {
464    //将外部的工作者状态,工作者线程唤醒器和指定任务池的单线程异步运行时转换成工作者异步运行时
465    fn from(from: (Arc<AtomicBool>,
466                   Arc<(AtomicBool, Mutex<()>, Condvar)>,
467                   SingleTaskRuntime<O, P>,)) -> Self {
468        WorkerRuntime(Arc::new(from))
469    }
470}
471
472impl<O: Default + 'static> Default for WorkerTaskRunner<O> {
473    fn default() -> Self {
474        WorkerTaskRunner::new(SingleTaskPool::default(),
475                              Arc::new(AtomicBool::new(true)),
476                              Arc::new((AtomicBool::new(false), Mutex::new(()), Condvar::new())))
477    }
478}
479
480impl<
481    O: Default + 'static,
482    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
483> WorkerTaskRunner<O, P> {
484    /// 用指定的任务池构建工作者任务执行器
485    pub fn new(pool: P,
486               worker_status: Arc<AtomicBool>,
487               worker_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) -> Self {
488        let runner = SingleTaskRunner::new(pool);
489        let rt = runner.startup().unwrap();
490        let inner = (worker_status.clone(), worker_waker.clone(), rt);
491        let runtime = WorkerRuntime(Arc::new(inner));
492
493        let inner = (worker_status,
494                     worker_waker,
495                     runner,
496                     runtime);
497
498        WorkerTaskRunner(Arc::new(inner))
499    }
500
501    /// 获取当前工作者异步任务执行器的工作者运行时
502    pub fn get_runtime(&self) -> WorkerRuntime<O, P> {
503        (self.0).3.clone()
504    }
505
506    /// 运行一次工作者异步任务执行器,返回当前任务池中任务的数量
507    #[inline]
508    pub fn run_once(&self) -> Result<usize> {
509        (self.0).2.run_once()
510    }
511
512    /// 运行单线程异步任务执行器,并执行任务池中的所有任务
513    #[inline]
514    pub fn run(&self) -> Result<usize> {
515        (self.0).2.run()
516    }
517
518    /// 启动工作者异步任务执行器
519    pub fn startup<LF, GQL>(self,
520                            thread_name: &str,
521                            thread_stack_size: usize,
522                            sleep_timeout: u64,
523                            loop_interval: Option<u64>,
524                            loop_func: LF,
525                            get_queue_len: GQL) -> WorkerRuntime<O, P>
526        where P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
527              LF: Fn() -> (bool, Duration) + Send + 'static,
528              GQL: Fn() -> usize + Send + 'static{
529        let rt_copy = (self.0).3.clone();
530        let thread_handler = (self.0).0.clone();
531        let thread_waker = (self.0).1.clone();
532        spawn_worker_thread(
533            thread_name,
534            thread_stack_size,
535            thread_handler,
536            thread_waker,
537            sleep_timeout,
538            loop_interval,
539            loop_func,
540            move || {
541                rt_copy.wait_len() + get_queue_len()
542            },
543        );
544
545        (self.0).3.clone()
546    }
547}