pi_async_rt/rt/
serial_local_thread.rs

1//! 本地单线程异步运行时
2//!
3//! - [LocalTaskRunner]\: 本地异步任务执行器
4//! - [LocalTaskRuntime]\: 本地异步任务运行时
5//!
6//! [LocalTaskRunner]: struct.LocalTaskRunner.html
7//! [LocalTaskRuntime]: struct.LocalTaskRuntime.html
8//!
9//! # Examples
10//!
11//! ```
12//! use pi_async::rt::{AsyncRuntime, AsyncRuntimeExt, serial_local_thread::{LocalTaskRunner, LocalTaskRuntime}};
13//! let rt = LocalTaskRunner::<()>::new().into_local();
14//! let _ = rt.block_on(async move {});
15//! ```
16
17use std::thread;
18use std::task::Context;
19use std::future::Future;
20use std::cell::UnsafeCell;
21use std::task::Poll::Pending;
22use std::collections::VecDeque;
23use std::io::Result as IOResult;
24use std::sync::{
25    atomic::{AtomicBool, Ordering},
26    Arc,
27};
28use std::sync::atomic::AtomicUsize;
29use std::time::{Duration, Instant};
30
31use async_stream::stream;
32use crossbeam_queue::SegQueue;
33use flume::bounded as async_bounded;
34use futures::{
35    future::{FutureExt, LocalBoxFuture},
36    stream::{LocalBoxStream, Stream, StreamExt},
37    task::{waker_ref, ArcWake},
38};
39#[cfg(not(target_arch = "wasm32"))]
40use polling::{Events, Poller};
41
42use crate::{
43    rt::{
44        alloc_rt_uid,
45        serial::{AsyncMapReduce, AsyncWait, AsyncWaitAny, AsyncWaitAnyCallback},
46        AsyncPipelineResult, YieldNow
47    },
48};
49
50// 本地异步任务
51pub(crate) struct LocalTask<O: Default + 'static = ()> {
52    inner: UnsafeCell<Option<LocalBoxFuture<'static, O>>>, //内部本地异步任务
53    runtime: LocalTaskRuntime<O>,                          //本地异步任务运行时
54}
55
56unsafe impl<O: Default + 'static> Send for LocalTask<O> {}
57unsafe impl<O: Default + 'static> Sync for LocalTask<O> {}
58
59impl<O: Default + 'static> ArcWake for LocalTask<O> {
60    fn wake_by_ref(arc_self: &Arc<Self>) {
61        arc_self.runtime.will_wakeup(arc_self.clone());
62    }
63}
64
65impl<O: Default + 'static> LocalTask<O> {
66    // 获取内部本地异步任务
67    pub fn get_inner(&self) -> Option<LocalBoxFuture<'static, O>> {
68        unsafe { (&mut *self.inner.get()).take() }
69    }
70
71    // 设置内部本地异步任务
72    pub fn set_inner(&self, inner: Option<LocalBoxFuture<'static, O>>) {
73        unsafe {
74            *self.inner.get() = inner;
75        }
76    }
77}
78
79///
80/// 本地异步任务运行时
81///
82#[cfg(not(target_arch = "wasm32"))]
83pub struct LocalTaskRuntime<O: Default + 'static = ()>(
84    Arc<(
85        usize,                                      //运行时唯一id
86        Arc<AtomicBool>,                            //运行状态
87        SegQueue<Arc<LocalTask<O>>>,                //外部任务队列
88        UnsafeCell<VecDeque<Arc<LocalTask<O>>>>,    //内部任务队列
89        Option<AtomicBool>,                         //合并唤醒标志
90        Option<Arc<Poller>>,                        //用于阻塞等待和跨线程唤醒
91    )>,
92);
93///
94/// 本地异步任务运行时
95///
96#[cfg(target_arch = "wasm32")]
97pub struct LocalTaskRuntime<O: Default + 'static = ()>(
98    Arc<(
99        usize,                                      //运行时唯一id
100        Arc<AtomicBool>,                            //运行状态
101        SegQueue<Arc<LocalTask<O>>>,                //外部任务队列
102        UnsafeCell<VecDeque<Arc<LocalTask<O>>>>,    //内部任务队列
103        Option<AtomicBool>,                         //合并唤醒标志
104    )>,
105);
106
107unsafe impl<O: Default + 'static> Send for LocalTaskRuntime<O> {}
108impl<O: Default + 'static> !Sync for LocalTaskRuntime<O> {}
109
110impl<O: Default + 'static> Clone for LocalTaskRuntime<O> {
111    fn clone(&self) -> Self {
112        LocalTaskRuntime(self.0.clone())
113    }
114}
115
116impl<O: Default + 'static> LocalTaskRuntime<O> {
117    /// 判断当前本地异步任务运行时是否正在运行
118    #[inline]
119    pub fn is_running(&self) -> bool {
120        self.0.1.load(Ordering::Relaxed)
121    }
122
123    /// 获取当前异步运行时的唯一id
124    pub fn get_id(&self) -> usize {
125        self.0.0
126    }
127
128    /// 获取当前异步运行时任务数量
129    pub fn len(&self) -> usize {
130        unsafe {
131            (self.0).2.len() + self.internal_len()
132        }
133    }
134
135    /// 获取当前内部任务的数量
136    #[inline]
137    pub(crate) fn internal_len(&self) -> usize {
138        unsafe {
139            (&*self.0.3.get()).len()
140        }
141    }
142
143    /// 派发一个指定的异步任务到异步运行时
144    pub fn spawn<F>(&self, future: F)
145    where
146        F: Future<Output = O> + 'static,
147    {
148        unsafe {
149            (&mut *(self.0).3.get()).push_back(Arc::new(LocalTask {
150                inner: UnsafeCell::new(Some(future.boxed_local())),
151                runtime: self.clone(),
152            }));
153        }
154    }
155
156    /// 将要唤醒指定的任务
157    #[inline]
158    pub(crate) fn will_wakeup(&self, task: Arc<LocalTask<O>>) {
159        self.0.2.push(task);
160    }
161
162    /// 线程安全的发送一个异步任务到异步运行时
163    pub fn send<F>(&self, future: F)
164    where
165        F: Future<Output = O> + 'static,
166    {
167        self.0.2.push(Arc::new(LocalTask {
168            inner: UnsafeCell::new(Some(future.boxed_local())),
169            runtime: self.clone(),
170        }));
171
172        #[cfg(not(target_arch = "wasm32"))]
173        if let Some(sleeping) = &self.0.4 {
174            if sleeping.compare_exchange(
175                false,
176                true,
177                Ordering::AcqRel,
178                Ordering::Relaxed).is_ok()
179            {
180                //需要唤醒
181                let _ = self
182                    .0
183                    .5
184                    .as_ref()
185                    .unwrap()
186                    .notify();
187            }
188        }
189    }
190
191    /// 将外部任务队列中的任务移动到内部任务队列
192    #[inline]
193    pub fn poll(&self) {
194        let internal = unsafe { &mut * (self.0).3.get() };
195        while let Some(task) = (self.0).2.pop() {
196            internal.push_back(task);
197        }
198    }
199
200    /// 挂起当前异步运行时的当前任务,并在指定的其它运行时上派发一个指定的异步任务,等待其它运行时上的异步任务完成后,唤醒当前运行时的当前任务,并返回其它运行时上的异步任务的值
201    pub fn wait<V: 'static>(&self) -> AsyncWait<V> {
202        AsyncWait::new(self.wait_any(2))
203    }
204
205    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,其中任意一个任务完成,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成的任务的值将被忽略
206    pub fn wait_any<V: 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
207        let (producor, consumer) = async_bounded(capacity);
208
209        AsyncWaitAny::new(capacity, producor, consumer)
210    }
211
212    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,任务返回后需要通过用户指定的检查回调进行检查,其中任意一个任务检查通过,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成或未检查通过的任务的值将被忽略,如果所有任务都未检查通过,则强制唤醒当前运行时的当前任务
213    pub fn wait_any_callback<V: 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
214        let (producor, consumer) = async_bounded(capacity);
215
216        AsyncWaitAnyCallback::new(capacity, producor, consumer)
217    }
218
219    /// 构建用于派发多个异步任务到指定运行时的映射归并,需要指定映射归并的容量
220    pub fn map_reduce<V: 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
221        let (producor, consumer) = async_bounded(capacity);
222
223        AsyncMapReduce::new(0, capacity, producor, consumer)
224    }
225
226    /// 立即让出当前任务的执行
227    pub fn yield_now(&self) -> LocalBoxFuture<'static, ()> {
228        async move {
229            YieldNow(false).await;
230        }.boxed_local()
231    }
232
233    /// 生成一个异步管道,输入指定流,输入流的每个值通过过滤器生成输出流的值
234    pub fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> LocalBoxStream<'static, FO>
235    where
236        S: Stream<Item = SO> + 'static,
237        SO: 'static,
238        F: FnMut(SO) -> AsyncPipelineResult<FO> + 'static,
239        FO: 'static,
240    {
241        let output = stream! {
242            for await value in input {
243                match filter(value) {
244                    AsyncPipelineResult::Disconnect => {
245                        //立即中止管道
246                        break;
247                    },
248                    AsyncPipelineResult::Filtered(result) => {
249                        yield result;
250                    },
251                }
252            }
253        };
254
255        output.boxed_local()
256    }
257
258    /// 阻塞当前线程,并在当前线程内执行指定的异步任务,返回指定异步任务执行后的结果
259    pub fn block_on<F>(&self, future: F) -> IOResult<F::Output>
260        where
261            F: Future + 'static,
262            <F as Future>::Output: Default + 'static,
263    {
264        let runner = LocalTaskRunner(self.clone());
265        let mut result: Option<<F as Future>::Output> = None;
266        let result_ptr = (&mut result) as *mut Option<<F as Future>::Output>;
267
268        self.spawn(async move {
269            //在指定运行时中执行,并返回结果
270            let r = future.await;
271            unsafe {
272                *result_ptr = Some(r);
273            }
274
275            Default::default()
276        });
277
278        loop {
279            //执行异步任务
280            while self.internal_len() > 0 {
281                runner.run_once();
282            }
283
284            //尝试获取异步任务的执行结果
285            if let Some(result) = result.take() {
286                //异步任务已完成,则立即返回执行结果
287                return Ok(result);
288            }
289        }
290    }
291
292    /// 关闭异步运行时,返回请求关闭是否成功
293    pub fn close(self) -> bool {
294        if cfg!(target_arch = "aarch64") {
295            if let Ok(true) =
296                (self.0)
297                    .1
298                    .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
299            {
300                //设置运行状态成功
301                true
302            } else {
303                false
304            }
305        } else {
306            if let Ok(true) =
307                (self.0)
308                    .1
309                    .compare_exchange_weak(true, false, Ordering::SeqCst, Ordering::SeqCst)
310            {
311                //设置运行状态成功
312                true
313            } else {
314                false
315            }
316        }
317    }
318}
319
320///
321/// 本地异步任务执行器
322///
323pub struct LocalTaskRunner<O: Default + 'static = ()>(LocalTaskRuntime<O>);
324
325unsafe impl<O: Default + 'static> Send for LocalTaskRunner<O> {}
326impl<O: Default + 'static> !Sync for LocalTaskRunner<O> {}
327
328impl<O: Default + 'static> LocalTaskRunner<O> {
329    /// 构建本地异步任务执行器
330    pub fn new() -> Self {
331        #[cfg(not(target_arch = "wasm32"))]
332        let inner = (
333            alloc_rt_uid(),
334            Arc::new(AtomicBool::new(false)),
335            SegQueue::new(),
336            UnsafeCell::new(VecDeque::new()),
337            None,
338            None,
339        );
340        #[cfg(target_arch = "wasm32")]
341        let inner = (
342            crate::rt::alloc_rt_uid(),
343            Arc::new(AtomicBool::new(false)),
344            SegQueue::new(),
345            UnsafeCell::new(VecDeque::new()),
346            None,
347        );
348
349        LocalTaskRunner(LocalTaskRuntime(Arc::new(inner)))
350    }
351
352    /// 构建一个指定了Poller的本地异步任务执行器
353    /// 注意当异步任务执行器在运行时,外部不允许使用wait方法
354    #[cfg(not(target_arch = "wasm32"))]
355    pub fn with_poll(poller: Arc<Poller>) -> Self {
356        let inner = (
357            alloc_rt_uid(),
358            Arc::new(AtomicBool::new(false)),
359            SegQueue::new(),
360            UnsafeCell::new(VecDeque::new()),
361            Some(AtomicBool::new(false)),
362            Some(poller),
363        );
364
365        LocalTaskRunner(LocalTaskRuntime(Arc::new(inner)))
366    }
367
368    /// 判断是指定使用Poll
369    #[cfg(not(target_arch = "wasm32"))]
370    #[inline(always)]
371    pub fn is_with_polling(&self) -> bool {
372        self.0.0.4.is_some()
373            && self.0.0.5.is_some()
374    }
375
376    /// 获取当前本地异步任务执行器的运行时
377    pub fn get_runtime(&self) -> LocalTaskRuntime<O> {
378        self.0.clone()
379    }
380
381    /// 启动工作者异步任务执行器
382    pub fn startup(
383        self,
384        thread_name: &str,
385        thread_stack_size: usize
386    ) -> LocalTaskRuntime<O> {
387        let rt = self.get_runtime();
388        let rt_copy = rt.clone();
389        let _ = thread::Builder::new()
390            .name(thread_name.to_string())
391            .stack_size(thread_stack_size)
392            .spawn(move || {
393                (rt_copy.0).1.store(true, Ordering::Relaxed);
394
395                while rt_copy.is_running() {
396                    self.poll();
397                    self.run_once();
398                }
399            });
400
401        rt
402    }
403
404    /// 启动设置了Poller的工作者异步任务执行器
405    #[cfg(not(target_arch = "wasm32"))]
406    pub fn startup_with_poll(
407        self,
408        thread_name: &str,
409        thread_stack_size: usize,
410        try_count: usize,
411        timeout: Option<Duration>,
412    ) -> LocalTaskRuntime<O> {
413        let rt = self.get_runtime();
414        let rt_copy = rt.clone();
415        let _ = thread::Builder::new()
416            .name(thread_name.to_string())
417            .stack_size(thread_stack_size)
418            .spawn(move || {
419                (rt_copy.0).1.store(true, Ordering::Relaxed);
420
421                let mut count = try_count;
422                while rt_copy.is_running() {
423                    self.poll();
424                    self.run_once();
425                    match self.try_sleep(count, timeout) {
426                        Err(e) => {
427                            rt_copy.0.1.store(false, Ordering::Release);
428                            panic!("Run runtime failed, reason: {:?}", e);
429                        },
430                        Ok(Some(new_count)) => {
431                            count = new_count;
432                            continue;
433                        },
434                        Ok(None) => {
435                            count = try_count;
436                            continue;
437                        },
438                    }
439                }
440            });
441
442        rt
443    }
444
445    /// 将外部任务队列中的任务移动到内部任务队列
446    /// 注意应该在执行run_once之前调用
447    #[inline]
448    pub fn poll(&self) {
449        while let Some(task) = ((self.0).0).2.pop() {
450            unsafe {
451               (&mut *((self.0).0).3.get()).push_back(task);
452            }
453        }
454    }
455
456    /// 运行一次本地异步任务执行器
457    #[inline]
458    pub fn run_once(&self) {
459        unsafe {
460            if let Some(task) = (&mut *(&(self.0).0).3.get()).pop_front() {
461                let waker = waker_ref(&task);
462                let mut context = Context::from_waker(&*waker);
463                if let Some(mut future) = task.get_inner() {
464                    if let Pending = future.as_mut().poll(&mut context) {
465                        //当前未准备好,则恢复本地异步任务,以保证本地异步任务不被提前释放
466                        task.set_inner(Some(future));
467                    }
468                }
469            }
470        }
471    }
472
473    /// 尝试休眠当前推动运行时的线程,并在派发新任务或休眠超时后唤醒
474    /// 返回Some表示还需要至少多少次尝试以后才可能休眠
475    /// 注意休眠只在设置了Poller后有效,且在执行run_once后调用
476    #[cfg(not(target_arch = "wasm32"))]
477    #[inline]
478    pub fn try_sleep(
479        &self,
480        try_count: usize,
481        timeout: Option<Duration>
482    ) -> IOResult<Option<usize>> {
483        if !self.is_with_polling() {
484            return Ok(Some(try_count));
485        }
486
487        if self.0.len() != 0 {
488            //还有任务需要处理,则立即返回
489            return Ok(Some(try_count));
490        }
491
492        if try_count != 0 {
493            //尝试计数不为0,则立即返回
494            return Ok(Some(try_count - 1));
495        }
496
497        let mut events = Events::with_capacity(std::num::NonZeroUsize::new(1).unwrap());
498        let _ = self
499            .0
500            .0
501            .5
502            .as_ref()
503            .unwrap()
504            .wait(&mut events, timeout)?;
505        self
506            .0
507            .0
508            .4
509            .as_ref()
510            .unwrap()
511            .store(false, Ordering::Release);
512
513        return Ok(None);
514    }
515
516    /// 转换为本地异步任务运行时
517    pub fn into_local(self) -> LocalTaskRuntime<O> {
518        self.0
519    }
520}
521
522#[test]
523fn test_local_runtime_block_on() {
524    struct AtomicCounter(AtomicUsize, Instant);
525    impl Drop for AtomicCounter {
526        fn drop(&mut self) {
527            {
528                println!(
529                    "!!!!!!drop counter, count: {:?}, time: {:?}",
530                    self.0.load(Ordering::Relaxed),
531                    Instant::now() - self.1
532                );
533            }
534        }
535    }
536
537    let rt = LocalTaskRunner::<()>::new().into_local();
538
539    let counter = Arc::new(AtomicCounter(AtomicUsize::new(0), Instant::now()));
540    for _ in 0..10000000 {
541        let counter_copy = counter.clone();
542        let _ = rt.block_on(async move { counter_copy.0.fetch_add(1, Ordering::Relaxed) });
543    }
544}