pi_async_rt/rt/
worker_thread.rs

1use std::future::Future;
2use std::time::Duration;
3use std::task::{Poll, Waker};
4use std::io::{Error, Result, ErrorKind};
5use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
6
7use parking_lot::{Mutex, Condvar};
8use futures::{future::BoxFuture,
9              stream::{Stream, BoxStream}};
10
11use crate::rt::{TaskId,
12                AsyncRuntime,
13                AsyncRuntimeExt,
14                AsyncTaskPool,
15                AsyncTaskPoolExt,
16                AsyncWait,
17                AsyncWaitAny,
18                AsyncWaitAnyCallback,
19                AsyncMapReduce,
20                AsyncPipelineResult,
21                LocalAsyncRuntime,
22                single_thread::{SingleTaskPool, SingleTaskRunner, SingleTaskRuntime},
23                spawn_worker_thread, wakeup_worker_thread};
24
25///
26/// 工作者异步运行时
27///
28pub struct WorkerRuntime<
29    O: Default + 'static = (),
30    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>,
31>(Arc<(
32    Arc<AtomicBool>,                        //工作者状态
33    Arc<(AtomicBool, Mutex<()>, Condvar)>,  //工作者线程唤醒器
34    SingleTaskRuntime<O, P>                 //单线程运行时
35)>);
36
37unsafe impl<
38    O: Default + 'static,
39    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
40> Send for WorkerRuntime<O, P> {}
41unsafe impl<
42    O: Default + 'static,
43    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
44> Sync for WorkerRuntime<O, P> {}
45
46impl<
47    O: Default + 'static,
48    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
49> Clone for WorkerRuntime<O, P> {
50    fn clone(&self) -> Self {
51        WorkerRuntime(self.0.clone())
52    }
53}
54
55impl<
56    O: Default + 'static,
57    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
58> AsyncRuntime<O> for WorkerRuntime<O, P> {
59    type Pool = P;
60
61    /// 共享运行时内部任务池
62    #[inline]
63    fn shared_pool(&self) -> Arc<Self::Pool> {
64        (self.0).2.shared_pool()
65    }
66
67    /// 获取当前异步运行时的唯一id
68    #[inline]
69    fn get_id(&self) -> usize {
70        (self.0).2.get_id()
71    }
72
73    /// 获取当前异步运行时待处理任务数量
74    #[inline]
75    fn wait_len(&self) -> usize {
76        (self.0).2.wait_len()
77    }
78
79    /// 获取当前异步运行时任务数量
80    #[inline]
81    fn len(&self) -> usize {
82        (self.0).2.len()
83    }
84
85    /// 分配异步任务的唯一id
86    #[inline]
87    fn alloc<R: 'static>(&self) -> TaskId {
88        (self.0).2.alloc::<R>()
89    }
90
91    /// 派发一个指定的异步任务到异步运行时
92    fn spawn<F>(&self, future: F) -> Result<TaskId>
93        where
94            F: Future<Output = O> + Send + 'static {
95        if !(self.0).0.load(Ordering::SeqCst) {
96            return Err(Error::new(ErrorKind::Other, "Spawn async task failed, reason: worker already closed"));
97        }
98
99        let result = (self.0).2.spawn(future);
100        wakeup_worker_thread(&(self.0).1, &(self.0).2);
101        result
102    }
103
104    /// 派发一个异步任务到本地异步运行时,如果本地没有本异步运行时,则会派发到当前运行时中
105    fn spawn_local<F>(&self, future: F) -> Result<TaskId>
106        where
107            F: Future<Output=O> + Send + 'static {
108        if !(self.0).0.load(Ordering::SeqCst) {
109            return Err(Error::new(ErrorKind::Other, "Spawn local async task failed, reason: worker already closed"));
110        }
111
112        let result = (self.0).2.spawn_local(future);
113        wakeup_worker_thread(&(self.0).1, &(self.0).2);
114        result
115    }
116
117    /// 派发一个指定优先级的异步任务到异步运行时
118    fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
119        where
120            F: Future<Output=O> + Send + 'static {
121        if !(self.0).0.load(Ordering::SeqCst) {
122            return Err(Error::new(ErrorKind::Other, "Spawn priority async task failed, reason: worker already closed"));
123        }
124
125        let result = (self.0).2.spawn_priority(priority, future);
126        wakeup_worker_thread(&(self.0).1, &(self.0).2);
127        result
128    }
129
130    /// 派发一个异步任务到异步运行时,并立即让出任务的当前运行
131    fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
132        where
133            F: Future<Output=O> + Send + 'static {
134        if !(self.0).0.load(Ordering::SeqCst) {
135            return Err(Error::new(ErrorKind::Other, "Spawn yield async task failed, reason: worker already closed"));
136        }
137
138        let result = (self.0).2.spawn_yield(future);
139        wakeup_worker_thread(&(self.0).1, &(self.0).2);
140        result
141    }
142
143    /// 派发一个在指定时间后执行的异步任务到异步运行时,时间单位ms
144    fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
145        where
146            F: Future<Output = O> + Send + 'static {
147        if !(self.0).0.load(Ordering::SeqCst) {
148            return Err(Error::new(ErrorKind::Other, "Spawn timing async task failed, reason: worker already closed"));
149        }
150
151        let result = (self.0).2.spawn_timing(future, time);
152        wakeup_worker_thread(&(self.0).1, &(self.0).2);
153        result
154    }
155
156    /// 派发一个指定任务唯一id的异步任务到异步运行时
157    fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
158        where
159            F: Future<Output=O> + Send + 'static {
160            if !(self.0).0.load(Ordering::SeqCst) {
161                return Err(Error::new(ErrorKind::Other, "Spawn async task by id failed, reason: worker already closed"));
162            }
163
164            let result = (self.0).2.spawn_by_id(task_id, future);
165            wakeup_worker_thread(&(self.0).1, &(self.0).2);
166            result
167    }
168
169    /// 派发一个指定任务唯一id的异步任务到本地异步运行时,如果本地没有本异步运行时,则会派发到当前运行时中
170    fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
171        where
172            F: Future<Output=O> + Send + 'static {
173        if !(self.0).0.load(Ordering::SeqCst) {
174            return Err(Error::new(ErrorKind::Other, "Spawn local async task by id failed, reason: worker already closed"));
175        }
176
177        let result = (self.0).2.spawn_local_by_id(task_id, future);
178        wakeup_worker_thread(&(self.0).1, &(self.0).2);
179        result
180    }
181
182    /// 派发一个指定任务唯一id和任务优先级的异步任务到异步运行时
183    fn spawn_priority_by_id<F>(&self,
184                               task_id: TaskId,
185                               priority: usize,
186                               future: F) -> Result<()>
187        where
188            F: Future<Output=O> + Send + 'static {
189        if !(self.0).0.load(Ordering::SeqCst) {
190            return Err(Error::new(ErrorKind::Other, "Spawn priority async task by id failed, reason: worker already closed"));
191        }
192
193        let result = (self.0).2.spawn_priority_by_id(task_id, priority, future);
194        wakeup_worker_thread(&(self.0).1, &(self.0).2);
195        result
196    }
197
198    /// 派发一个指定任务唯一id的异步任务到异步运行时,并立即让出任务的当前运行
199    fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
200        where
201            F: Future<Output=O> + Send + 'static {
202        if !(self.0).0.load(Ordering::SeqCst) {
203            return Err(Error::new(ErrorKind::Other, "Spawn yield async task by id failed, reason: worker already closed"));
204        }
205
206        let result = (self.0).2.spawn_yield_by_id(task_id, future);
207        wakeup_worker_thread(&(self.0).1, &(self.0).2);
208        result
209    }
210
211    /// 派发一个指定任务唯一id和在指定时间后执行的异步任务到异步运行时,时间单位ms
212    fn spawn_timing_by_id<F>(&self,
213                             task_id: TaskId,
214                             future: F,
215                             time: usize) -> Result<()>
216        where
217            F: Future<Output=O> + Send + 'static {
218        if !(self.0).0.load(Ordering::SeqCst) {
219            return Err(Error::new(ErrorKind::Other, "Spawn timing async task by id failed, reason: worker already closed"));
220        }
221
222        let result = (self.0).2.spawn_timing_by_id(task_id, future, time);
223        wakeup_worker_thread(&(self.0).1, &(self.0).2);
224        result
225    }
226
227    /// 挂起指定唯一id的异步任务
228    #[inline]
229    fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
230        (self.0).2.pending::<Output>(task_id, waker)
231    }
232
233    /// 唤醒指定唯一id的异步任务
234    #[inline]
235    fn wakeup<Output: 'static>(&self, task_id: &TaskId) {
236        (self.0).2.wakeup::<Output>(task_id);
237    }
238
239    /// 挂起当前异步运行时的当前任务,并在指定的其它运行时上派发一个指定的异步任务,等待其它运行时上的异步任务完成后,唤醒当前运行时的当前任务,并返回其它运行时上的异步任务的值
240    #[inline]
241    fn wait<V: Send + 'static>(&self) -> AsyncWait<V> {
242        (self.0).2.wait()
243    }
244
245    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,其中任意一个任务完成,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成的任务的值将被忽略
246    #[inline]
247    fn wait_any<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
248        (self.0).2.wait_any(capacity)
249    }
250
251    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,任务返回后需要通过用户指定的检查回调进行检查,其中任意一个任务检查通过,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成或未检查通过的任务的值将被忽略,如果所有任务都未检查通过,则强制唤醒当前运行时的当前任务
252    #[inline]
253    fn wait_any_callback<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
254        (self.0).2.wait_any_callback(capacity)
255    }
256
257    /// 构建用于派发多个异步任务到指定运行时的映射归并,需要指定映射归并的容量
258    #[inline]
259    fn map_reduce<V: Send + 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
260        (self.0).2.map_reduce(capacity)
261    }
262
263    /// 挂起当前异步运行时的当前任务,等待指定的时间后唤醒当前任务
264    #[inline]
265    fn timeout(&self, timeout: usize) -> BoxFuture<'static, ()> {
266        (self.0).2.timeout(timeout)
267    }
268
269    /// 立即让出当前任务的执行
270    #[inline]
271    fn yield_now(&self) -> BoxFuture<'static, ()> {
272        (self.0).2.yield_now()
273    }
274
275    /// 生成一个异步管道,输入指定流,输入流的每个值通过过滤器生成输出流的值
276    #[inline]
277    fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> BoxStream<'static, FO>
278        where S: Stream<Item = SO> + Send + 'static,
279              SO: Send + 'static,
280              F: FnMut(SO) -> AsyncPipelineResult<FO> + Send + 'static,
281              FO: Send + 'static {
282        (self.0).2.pipeline(input, filter)
283    }
284
285    /// 关闭异步运行时,返回请求关闭是否成功
286    fn close(&self) -> bool {
287        if cfg!(target_arch = "aarch64") {
288            if let Ok(true) = (self.0).0.compare_exchange(true,
289                                                          false,
290                                                          Ordering::SeqCst,
291                                                          Ordering::SeqCst) {
292                //设置工作者状态成功,检查运行时所在线程是否需要唤醒
293                wakeup_worker_thread(&(self.0).1, &(self.0).2);
294                true
295            } else {
296                false
297            }
298        } else {
299            if let Ok(true) = (self.0).0.compare_exchange_weak(true,
300                                                               false,
301                                                               Ordering::SeqCst,
302                                                               Ordering::SeqCst) {
303                //设置工作者状态成功,检查运行时所在线程是否需要唤醒
304                wakeup_worker_thread(&(self.0).1, &(self.0).2);
305                true
306            } else {
307                false
308            }
309        }
310    }
311}
312
313impl<
314    O: Default + 'static,
315    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
316> AsyncRuntimeExt<O> for WorkerRuntime<O, P> {
317    #[inline]
318    fn spawn_with_context<F, C>(&self,
319                                task_id: TaskId,
320                                future: F,
321                                context: C) -> Result<()>
322        where F: Future<Output = O> + Send + 'static,
323              C: 'static {
324        (self.0).2.spawn_with_context(task_id, future, context)
325    }
326
327    #[inline]
328    fn spawn_timing_with_context<F, C>(&self,
329                                       task_id: TaskId,
330                                       future: F,
331                                       context: C,
332                                       time: usize) -> Result<()>
333        where F: Future<Output = O> + Send + 'static,
334              C: Send + 'static {
335        (self.0).2.spawn_timing_with_context(task_id, future, context, time)
336    }
337
338    #[inline]
339    fn block_on<F>(&self, future: F) -> Result<F::Output>
340        where F: Future + Send + 'static,
341              <F as Future>::Output: Default + Send + 'static {
342        (self.0).2.block_on::<F>(future)
343    }
344}
345
346impl<
347    O: Default + 'static,
348    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
349> WorkerRuntime<O, P> {
350    /// 获取工作者异步运行时的工作者状态
351    pub fn get_worker_status(&self) -> &Arc<AtomicBool> {
352        &(self.0).0
353    }
354
355    /// 获取工作者异步运行时的工作者线程状态
356    pub fn get_worker_waker(&self) -> &Arc<(AtomicBool, Mutex<()>, Condvar)> {
357        &(self.0).1
358    }
359
360    /// 获取工作者异步运行时的单线程异步运行时
361    pub fn get_worker_runtime(&self) -> &SingleTaskRuntime<O, P> {
362        &(self.0).2
363    }
364
365    /// 获取当前工作者异步运行时的本地异步运行时
366    pub fn to_local_runtime(&self) -> LocalAsyncRuntime<O> {
367        LocalAsyncRuntime {
368            inner: self.as_raw(),
369            get_id_func: WorkerRuntime::<O, P>::get_id_raw,
370            spawn_func: WorkerRuntime::<O, P>::spawn_raw,
371            spawn_timing_func: WorkerRuntime::<O, P>::spawn_timing_raw,
372            timeout_func: WorkerRuntime::<O, P>::timeout_raw,
373        }
374    }
375
376    // 获取当前工作者异步运行时的指针
377    #[inline]
378    pub(crate) fn as_raw(&self) -> *const () {
379        Arc::into_raw(self.0.clone()) as *const ()
380    }
381
382    // 获取指定指针的工作者异步运行时
383    #[inline]
384    pub(crate) fn from_raw(raw: *const ()) -> Self {
385        let inner = unsafe {
386            Arc::from_raw(raw as *const (
387                Arc<AtomicBool>,
388                Arc<(AtomicBool, Mutex<()>, Condvar)>,
389                SingleTaskRuntime<O, P>),
390            )
391        };
392        WorkerRuntime(inner)
393    }
394
395    // 获取当前异步运行时的唯一id
396    pub(crate) fn get_id_raw(raw: *const ()) -> usize {
397        let rt = WorkerRuntime::<O, P>::from_raw(raw);
398        let id = rt.get_id();
399        Arc::into_raw(rt.0); //避免提前释放
400        id
401    }
402
403    // 派发一个指定的异步任务到异步运行时
404    pub(crate) fn spawn_raw(raw: *const (),
405                            future: BoxFuture<'static, O>) -> Result<()> {
406        let rt = WorkerRuntime::<O, P>::from_raw(raw);
407        let result = rt.spawn_by_id(rt.alloc::<O>(), future);
408        Arc::into_raw(rt.0); //避免提前释放
409        result
410    }
411
412    // 定时派发一个指定的异步任务到异步运行时
413    pub(crate) fn spawn_timing_raw(raw: *const (),
414                                   future: BoxFuture<'static, O>,
415                                   timeout: usize) -> Result<()> {
416        let rt = WorkerRuntime::<O, P>::from_raw(raw);
417        let result = rt.spawn_timing_by_id(rt.alloc::<O>(), future, timeout);
418        Arc::into_raw(rt.0); //避免提前释放
419        result
420    }
421
422    // 挂起当前异步运行时的当前任务,等待指定的时间后唤醒当前任务
423    pub(crate) fn timeout_raw(raw: *const (),
424                              timeout: usize) -> BoxFuture<'static, ()> {
425        let rt = WorkerRuntime::<O, P>::from_raw(raw);
426        let boxed = rt.timeout(timeout);
427        Arc::into_raw(rt.0); //避免提前释放
428        boxed
429    }
430}
431
432///
433/// 工作者任务执行器
434///
435pub struct WorkerTaskRunner<
436    O: Default + 'static = (),
437    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P> = SingleTaskPool<O>,
438>(Arc<(
439    Arc<AtomicBool>,                        //工作者状态
440    Arc<(AtomicBool, Mutex<()>, Condvar)>,  //工作者线程唤醒器
441    SingleTaskRunner<O, P>,                 //单线程异步任务执行器
442    WorkerRuntime<O, P>,                    //工作者运行时
443)>);
444
445unsafe impl<
446    O: Default + 'static,
447    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
448> Send for WorkerTaskRunner<O, P> {}
449unsafe impl<
450    O: Default + 'static,
451    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
452> Sync for WorkerTaskRunner<O, P> {}
453
454impl<
455    O: Default + 'static,
456    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
457> Clone for WorkerTaskRunner<O, P> {
458    fn clone(&self) -> Self {
459        WorkerTaskRunner(self.0.clone())
460    }
461}
462
463impl<
464    O: Default + 'static,
465    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
466> From<(Arc<AtomicBool>, Arc<(AtomicBool, Mutex<()>, Condvar)>, SingleTaskRuntime<O, P>)> for WorkerRuntime<O, P> {
467    //将外部的工作者状态,工作者线程唤醒器和指定任务池的单线程异步运行时转换成工作者异步运行时
468    fn from(from: (Arc<AtomicBool>,
469                   Arc<(AtomicBool, Mutex<()>, Condvar)>,
470                   SingleTaskRuntime<O, P>,)) -> Self {
471        WorkerRuntime(Arc::new(from))
472    }
473}
474
475impl<O: Default + 'static> Default for WorkerTaskRunner<O> {
476    fn default() -> Self {
477        WorkerTaskRunner::new(SingleTaskPool::default(),
478                              Arc::new(AtomicBool::new(true)),
479                              Arc::new((AtomicBool::new(false), Mutex::new(()), Condvar::new())))
480    }
481}
482
483impl<
484    O: Default + 'static,
485    P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
486> WorkerTaskRunner<O, P> {
487    /// 用指定的任务池构建工作者任务执行器
488    pub fn new(pool: P,
489               worker_status: Arc<AtomicBool>,
490               worker_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) -> Self {
491        let runner = SingleTaskRunner::new(pool);
492        let rt = runner.startup().unwrap();
493        let inner = (worker_status.clone(), worker_waker.clone(), rt);
494        let runtime = WorkerRuntime(Arc::new(inner));
495
496        let inner = (worker_status,
497                     worker_waker,
498                     runner,
499                     runtime);
500
501        WorkerTaskRunner(Arc::new(inner))
502    }
503
504    /// 获取当前工作者异步任务执行器的工作者运行时
505    pub fn get_runtime(&self) -> WorkerRuntime<O, P> {
506        (self.0).3.clone()
507    }
508
509    /// 运行一次工作者异步任务执行器,返回当前任务池中任务的数量
510    #[inline]
511    pub fn run_once(&self) -> Result<usize> {
512        (self.0).2.run_once()
513    }
514
515    /// 运行单线程异步任务执行器,并执行任务池中的所有任务
516    #[inline]
517    pub fn run(&self) -> Result<usize> {
518        (self.0).2.run()
519    }
520
521    /// 启动工作者异步任务执行器
522    pub fn startup<LF, GQL>(self,
523                            thread_name: &str,
524                            thread_stack_size: usize,
525                            sleep_timeout: u64,
526                            loop_interval: Option<u64>,
527                            loop_func: LF,
528                            get_queue_len: GQL) -> WorkerRuntime<O, P>
529        where P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
530              LF: Fn() -> (bool, Duration) + Send + 'static,
531              GQL: Fn() -> usize + Send + 'static{
532        let rt_copy = (self.0).3.clone();
533        let thread_handler = (self.0).0.clone();
534        let thread_waker = (self.0).1.clone();
535        spawn_worker_thread(
536            thread_name,
537            thread_stack_size,
538            thread_handler,
539            thread_waker,
540            sleep_timeout,
541            loop_interval,
542            loop_func,
543            move || {
544                rt_copy.wait_len() + get_queue_len()
545            },
546        );
547
548        (self.0).3.clone()
549    }
550}