pi_async_rt/rt/
serial_local_compatible_wasm_runtime.rs

1//! 兼容wasm的本地单线程异步运行时
2//!
3//! - [LocalTaskRunner]\: 本地异步任务执行器
4//! - [LocalTaskRuntime]\: 本地异步任务运行时
5//!
6//! [LocalTaskRunner]: struct.LocalTaskRunner.html
7//! [LocalTaskRuntime]: struct.LocalTaskRuntime.html
8//!
9//! # Examples
10//!
11//! ```
12//! use pi_async::rt::{AsyncRuntime, AsyncRuntimeExt, serial_local_compatible_wasm_thread::{LocalTaskRunner, LocalTaskRuntime}};
13//! let rt = LocalTaskRunner::<()>::new().into_local();
14//! let _ = rt.block_on(async move {});
15//! ```
16
17use std::thread;
18use std::vec::IntoIter;
19use std::future::Future;
20use std::cell::UnsafeCell;
21use std::collections::VecDeque;
22use std::task::{Context, Poll, Waker};
23use std::sync::{
24    atomic::{AtomicBool, Ordering},
25    Arc,
26};
27use std::io::{Error, Result, ErrorKind};
28use std::sync::atomic::AtomicUsize;
29use std::time::Instant;
30
31use async_stream::stream;
32use flume::bounded as async_bounded;
33use futures::{
34    future::{FutureExt, LocalBoxFuture},
35    stream::{LocalBoxStream, Stream, StreamExt},
36    task::{waker_ref, ArcWake},
37};
38
39use crate::{rt::{DEFAULT_MAX_HIGH_PRIORITY_BOUNDED, TaskId, AsyncPipelineResult, YieldNow, alloc_rt_uid,
40                 serial::{AsyncRuntime, AsyncRuntimeExt, AsyncTaskPool, AsyncTaskPoolExt, AsyncTask, AsyncMapReduce, AsyncWait, AsyncWaitAny, AsyncWaitAnyCallback}}};
41
42///
43/// 兼容wasm的本地单线程异步任务池
44///
45pub struct LocalTaskPool<O: Default + 'static> {
46    inner:  UnsafeCell<VecDeque<Arc<AsyncTask<Self, O>>>>,  //任务队列
47}
48
49unsafe impl<O: Default + 'static> Sync for LocalTaskPool<O> {}
50
51impl<O: Default + 'static> Default for LocalTaskPool<O> {
52    fn default() -> Self {
53        LocalTaskPool {
54            inner: UnsafeCell::new(VecDeque::default()),
55        }
56    }
57}
58
59impl<O: Default + 'static> AsyncTaskPool<O> for LocalTaskPool<O> {
60    type Pool = LocalTaskPool<O>;
61
62    #[inline]
63    fn get_thread_id(&self) -> usize {
64        0
65    }
66
67    #[inline]
68    fn len(&self) -> usize {
69        unsafe {
70            (&*self.inner.get()).len()
71        }
72    }
73
74    #[inline]
75    fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
76        self.push_local(task)
77    }
78
79    #[inline]
80    fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
81        unsafe {
82            (&mut *self.inner.get()).push_back(task);
83            Ok(())
84        }
85    }
86
87    #[inline]
88    fn push_priority(&self,
89                     _priority: usize,
90                     task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
91        self.push_local(task)
92    }
93
94    #[inline]
95    fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
96        self.push_local(task)
97    }
98
99    #[inline]
100    fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
101        unsafe {
102            (&mut *self.inner.get()).pop_front()
103        }
104    }
105
106    #[inline]
107    fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
108        let mut all = Vec::with_capacity(self.len());
109
110        let internal = unsafe { &mut *self.inner.get() };
111        for _ in 0..internal.len() {
112            if let Some(task) = internal.pop_front() {
113                all.push(task);
114            }
115        }
116
117        all.into_iter()
118    }
119}
120
121impl<O: Default + 'static> AsyncTaskPoolExt<O> for LocalTaskPool<O> {
122    fn worker_len(&self) -> usize {
123        1
124    }
125}
126
127impl<O: Default + 'static> LocalTaskPool<O> {
128    /// 构建一个兼容wasm的本地单线程异步任务池
129    pub fn new() -> Self {
130        Self::default()
131    }
132
133    /// 获取当前内部任务的数量
134    #[inline]
135    pub(crate) fn internal_len(&self) -> usize {
136        unsafe {
137            (&*self.inner.get()).len()
138        }
139    }
140
141    /// 将要唤醒指定的任务
142    #[inline]
143    pub(crate) fn will_wakeup(&self, task: Arc<AsyncTask<Self, O>>) {
144        unsafe {
145            (&mut *self.inner.get()).push_back(task);
146        }
147    }
148}
149
150///
151/// 兼容wasm的本地单线程异步运行时
152///
153pub struct LocalTaskRuntime<O: Default + 'static = ()>(Arc<InnerLocalTaskRuntime<O>>);
154
155struct InnerLocalTaskRuntime<O: Default + 'static = ()> {
156    uid:        usize,                  //运行时唯一id
157    running:    Arc<AtomicBool>,        //运行状态
158    pool:       Arc<LocalTaskPool<O>>,  //任务池
159}
160
161impl<O: Default + 'static> Clone for LocalTaskRuntime<O> {
162    fn clone(&self) -> Self {
163        LocalTaskRuntime(self.0.clone())
164    }
165}
166
167impl<O: Default + 'static> AsyncRuntime<O> for LocalTaskRuntime<O> {
168    type Pool = LocalTaskPool<O>;
169
170    fn shared_pool(&self) -> Arc<Self::Pool> {
171        self.0.pool.clone()
172    }
173
174    fn get_id(&self) -> usize {
175        self.0.uid
176    }
177
178    fn wait_len(&self) -> usize {
179        self.0.pool.len()
180    }
181
182    fn len(&self) -> usize {
183        self.wait_len()
184    }
185
186    fn alloc<R: 'static>(&self) -> TaskId {
187        TaskId(UnsafeCell::new(0))
188    }
189
190    fn spawn<F>(&self, future: F) -> Result<TaskId>
191        where
192            F: Future<Output = O> + 'static {
193        let task_id = self.alloc::<F::Output>();
194        if let Err(e) = self.spawn_by_id(task_id.clone(), future) {
195            return Err(e);
196        }
197
198        Ok(task_id)
199    }
200
201    fn spawn_local<F>(&self, future: F) -> Result<TaskId>
202        where
203            F: Future<Output = O> + 'static {
204        let task_id = self.alloc::<F::Output>();
205        if let Err(e) = self.spawn_local_by_id(task_id.clone(), future) {
206            return Err(e);
207        }
208
209        Ok(task_id)
210    }
211
212    fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
213        where
214            F: Future<Output = O> + 'static {
215        let task_id = self.alloc::<F::Output>();
216        if let Err(e) = self.spawn_priority_by_id(task_id.clone(), priority, future) {
217            return Err(e);
218        }
219
220        Ok(task_id)
221    }
222
223    fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
224        where
225            F: Future<Output = O> + 'static {
226        let task_id = self.alloc::<F::Output>();
227        if let Err(e) = self.spawn_yield_by_id(task_id.clone(), future) {
228            return Err(e);
229        }
230
231        Ok(task_id)
232    }
233
234    fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
235        where
236            F: Future<Output = O> + 'static {
237        let task_id = self.alloc::<F::Output>();
238        if let Err(e) = self.spawn_timing_by_id(task_id.clone(), future, time) {
239            return Err(e);
240        }
241
242        Ok(task_id)
243    }
244
245    fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
246        where
247            F: Future<Output = O> + 'static {
248        self.spawn_local_by_id(task_id, future)
249    }
250
251    fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
252        where
253            F: Future<Output = O> + 'static {
254        if let Err(e) = self.0.pool.push_local(Arc::new(AsyncTask::new(
255            task_id,
256            self.0.pool.clone(),
257            DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
258            Some(future.boxed_local()),
259        ))) {
260            return Err(Error::new(ErrorKind::Other, e));
261        }
262
263        Ok(())
264    }
265
266    fn spawn_priority_by_id<F>(&self,
267                               task_id: TaskId,
268                               _priority: usize,
269                               future: F) -> Result<()>
270        where
271            F: Future<Output = O> + 'static {
272        if let Err(e) = self.0.pool.push_priority(DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
273                                                  Arc::new(AsyncTask::new(
274                                                      task_id,
275                                                      self.0.pool.clone(),
276                                                      DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
277                                                      Some(future.boxed_local()),
278                                                  ))) {
279            return Err(Error::new(ErrorKind::Other, e));
280        }
281
282        Ok(())
283    }
284
285    /// 派发一个指定任务唯一id的异步任务到异步运行时,并立即让出任务的当前运行
286    #[inline]
287    fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
288        where
289            F: Future<Output = O> + 'static {
290        self.spawn_priority_by_id(task_id,
291                                  DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
292                                  future)
293    }
294
295    /// 派发一个指定任务唯一id和在指定时间后执行的异步任务到异步运行时,时间单位ms
296    fn spawn_timing_by_id<F>(&self,
297                             _task_id: TaskId,
298                             _future: F,
299                             _time: usize) -> Result<()>
300        where
301            F: Future<Output = O> + 'static {
302        Err(Error::new(ErrorKind::Other, "unimplemented"))
303    }
304
305    /// 挂起指定唯一id的异步任务
306    fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
307        unimplemented!()
308    }
309
310    /// 唤醒指定唯一id的异步任务
311    fn wakeup<Output: 'static>(&self, task_id: &TaskId) {
312        unimplemented!()
313    }
314
315    /// 挂起当前异步运行时的当前任务,并在指定的其它运行时上派发一个指定的异步任务,等待其它运行时上的异步任务完成后,唤醒当前运行时的当前任务,并返回其它运行时上的异步任务的值
316    fn wait<V: 'static>(&self) -> AsyncWait<V> {
317        AsyncWait::new(self.wait_any(2))
318    }
319
320    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,其中任意一个任务完成,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成的任务的值将被忽略
321    fn wait_any<V: 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
322        let (producor, consumer) = async_bounded(capacity);
323
324        AsyncWaitAny::new(capacity, producor, consumer)
325    }
326
327    /// 挂起当前异步运行时的当前任务,并在多个其它运行时上执行多个其它任务,任务返回后需要通过用户指定的检查回调进行检查,其中任意一个任务检查通过,则唤醒当前运行时的当前任务,并返回这个已完成任务的值,而其它未完成或未检查通过的任务的值将被忽略,如果所有任务都未检查通过,则强制唤醒当前运行时的当前任务
328    fn wait_any_callback<V: 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
329        let (producor, consumer) = async_bounded(capacity);
330
331        AsyncWaitAnyCallback::new(capacity, producor, consumer)
332    }
333
334    /// 构建用于派发多个异步任务到指定运行时的映射归并,需要指定映射归并的容量
335    fn map_reduce<V: 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
336        let (producor, consumer) = async_bounded(capacity);
337
338        AsyncMapReduce::new(0, capacity, producor, consumer)
339    }
340
341    /// 挂起当前异步运行时的当前任务,等待指定的时间后唤醒当前任务
342    fn timeout(&self, _timeout: usize) -> LocalBoxFuture<'static, ()> {
343        unimplemented!()
344    }
345
346    /// 立即让出当前任务的执行
347    fn yield_now(&self) -> LocalBoxFuture<'static, ()> {
348        async move {
349            YieldNow(false).await;
350        }.boxed_local()
351    }
352
353    /// 生成一个异步管道,输入指定流,输入流的每个值通过过滤器生成输出流的值
354    fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> LocalBoxStream<'static, FO>
355        where
356            S: Stream<Item = SO> + 'static,
357            SO: 'static,
358            F: FnMut(SO) -> AsyncPipelineResult<FO> + 'static,
359            FO: 'static,
360    {
361        let output = stream! {
362            for await value in input {
363                match filter(value) {
364                    AsyncPipelineResult::Disconnect => {
365                        //立即中止管道
366                        break;
367                    },
368                    AsyncPipelineResult::Filtered(result) => {
369                        yield result;
370                    },
371                }
372            }
373        };
374
375        output.boxed_local()
376    }
377
378    /// 关闭异步运行时,返回请求关闭是否成功
379    fn close(&self) -> bool {
380        if cfg!(target_arch = "aarch64") {
381            if let Ok(true) =
382                self
383                    .0
384                    .running
385                    .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
386            {
387                //设置运行状态成功
388                true
389            } else {
390                false
391            }
392        } else {
393            if let Ok(true) =
394                self
395                    .0
396                    .running
397                    .compare_exchange_weak(true, false, Ordering::SeqCst, Ordering::SeqCst)
398            {
399                //设置运行状态成功
400                true
401            } else {
402                false
403            }
404        }
405    }
406}
407
408impl<O: Default + 'static> AsyncRuntimeExt<O> for LocalTaskRuntime<O> {
409    fn spawn_with_context<F, C>(&self,
410                                _task_id: TaskId,
411                                _future: F,
412                                _context: C) -> Result<()>
413        where F: Future<Output = O> + 'static,
414              C: 'static {
415        Err(Error::new(ErrorKind::Other, "unimplemented"))
416    }
417
418    /// 派发一个在指定时间后执行的异步任务到异步运行时,并指定异步任务的初始化上下文,时间单位ms
419    fn spawn_timing_with_context<F, C>(&self,
420                                       task_id: TaskId,
421                                       future: F,
422                                       context: C,
423                                       time: usize) -> Result<()>
424        where F: Future<Output = O> + 'static,
425              C: 'static {
426        Err(Error::new(ErrorKind::Other, "unimplemented"))
427    }
428
429    /// 立即创建一个指定任务池的异步运行时,并执行指定的异步任务,阻塞当前线程,等待异步任务完成后返回
430    fn block_on<F>(&self, future: F) -> Result<F::Output>
431        where F: Future + 'static,
432              <F as Future>::Output: Default + 'static {
433        let runner = LocalTaskRunner(self.clone());
434        let mut result: Option<<F as Future>::Output> = None;
435        let result_ptr = (&mut result) as *mut Option<<F as Future>::Output>;
436
437        self.spawn_local(async move {
438            //在指定运行时中执行,并返回结果
439            let r = future.await;
440            unsafe {
441                *result_ptr = Some(r);
442            }
443
444            Default::default()
445        });
446
447        loop {
448            //执行异步任务
449            while self.internal_len() > 0 {
450                runner.run_once();
451            }
452
453            //尝试获取异步任务的执行结果
454            if let Some(result) = result.take() {
455                //异步任务已完成,则立即返回执行结果
456                return Ok(result);
457            }
458        }
459    }
460}
461
462impl<O: Default + 'static> LocalTaskRuntime<O> {
463    /// 判断当前本地异步任务运行时是否正在运行
464    #[inline]
465    pub fn is_running(&self) -> bool {
466        self
467            .0
468            .running
469            .load(Ordering::Relaxed)
470    }
471
472    /// 获取当前内部任务的数量
473    #[inline]
474    pub fn internal_len(&self) -> usize {
475       self.0.pool.internal_len()
476    }
477
478    /// 将要唤醒指定的任务
479    #[inline]
480    pub(crate) fn will_wakeup(&self, task: Arc<AsyncTask<<Self as AsyncRuntime<O>>::Pool, O>>) {
481        self.0.pool.will_wakeup(task);
482    }
483
484    /// 线程安全的发送一个异步任务到异步运行时
485    pub fn send<F>(&self, future: F)
486        where
487            F: Future<Output = O> + 'static,
488    {
489        let task_id = self.alloc::<F::Output>();
490        self.0.pool.push(Arc::new(AsyncTask::new(
491            task_id,
492            self.0.pool.clone(),
493            DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
494            Some(future.boxed_local()),
495        )));
496    }
497}
498
499///
500/// 本地异步任务执行器
501///
502pub struct LocalTaskRunner<O: Default + 'static = ()>(LocalTaskRuntime<O>);
503
504unsafe impl<O: Default + 'static> Send for LocalTaskRunner<O> {}
505impl<O: Default + 'static> !Sync for LocalTaskRunner<O> {}
506
507impl<O: Default + 'static> LocalTaskRunner<O> {
508    /// 构建本地异步任务执行器
509    pub fn new() -> Self {
510        let inner = InnerLocalTaskRuntime {
511            uid: alloc_rt_uid(),
512            running: Arc::new(AtomicBool::new(false)),
513            pool: Arc::new(LocalTaskPool::new()),
514        };
515
516        LocalTaskRunner(LocalTaskRuntime(Arc::new(inner)))
517    }
518
519    /// 获取当前本地异步任务执行器的运行时
520    pub fn get_runtime(&self) -> LocalTaskRuntime<O> {
521        self.0.clone()
522    }
523
524    /// 启动工作者异步任务执行器
525    pub fn startup(self,
526                   thread_name: &str,
527                   thread_stack_size: usize) -> LocalTaskRuntime<O> {
528        let rt = self.get_runtime();
529        let rt_copy = rt.clone();
530        let _ = thread::Builder::new()
531            .name(thread_name.to_string())
532            .stack_size(thread_stack_size)
533            .spawn(move || {
534                rt_copy
535                    .0
536                    .running
537                    .store(true, Ordering::Relaxed);
538
539                while rt_copy.is_running() {
540                    self.run_once();
541                }
542            });
543
544        rt
545    }
546
547    // 运行一次本地异步任务执行器
548    #[inline]
549    pub fn run_once(&self) {
550        unsafe {
551            if let Some(task) = (self.0).0.pool.try_pop() {
552                let waker = waker_ref(&task);
553                let mut context = Context::from_waker(&*waker);
554                if let Some(mut future) = task.get_inner() {
555                    if let Poll::Pending = future.as_mut().poll(&mut context) {
556                        //当前未准备好,则恢复本地异步任务,以保证本地异步任务不被提前释放
557                        task.set_inner(Some(future));
558                    }
559                }
560            }
561        }
562    }
563
564    /// 转换为本地异步任务运行时
565    pub fn into_local(self) -> LocalTaskRuntime<O> {
566        self.0
567    }
568}
569
570#[test]
571fn test_local_compatible_wasm_runtime_block_on() {
572    struct AtomicCounter(AtomicUsize, Instant);
573    impl Drop for AtomicCounter {
574        fn drop(&mut self) {
575            {
576                println!(
577                    "!!!!!!drop counter, count: {:?}, time: {:?}",
578                    self.0.load(Ordering::Relaxed),
579                    Instant::now() - self.1
580                );
581            }
582        }
583    }
584
585    let rt = LocalTaskRunner::<()>::new().into_local();
586
587    let counter = Arc::new(AtomicCounter(AtomicUsize::new(0), Instant::now()));
588    for _ in 0..10000000 {
589        let counter_copy = counter.clone();
590        let _ = rt.block_on(async move { counter_copy.0.fetch_add(1, Ordering::Relaxed) });
591    }
592}
593
594
595
596
597
598
599
600