1use std::thread;
18use std::vec::IntoIter;
19use std::future::Future;
20use std::cell::UnsafeCell;
21use std::collections::VecDeque;
22use std::task::{Context, Poll, Waker};
23use std::sync::{
24 atomic::{AtomicBool, Ordering},
25 Arc,
26};
27use std::io::{Error, Result, ErrorKind};
28use std::sync::atomic::AtomicUsize;
29use std::time::Instant;
30
31use async_stream::stream;
32use flume::bounded as async_bounded;
33use futures::{
34 future::{FutureExt, LocalBoxFuture},
35 stream::{LocalBoxStream, Stream, StreamExt},
36 task::{waker_ref, ArcWake},
37};
38
39use crate::{rt::{DEFAULT_MAX_HIGH_PRIORITY_BOUNDED, TaskId, AsyncPipelineResult, YieldNow, alloc_rt_uid,
40 serial::{AsyncRuntime, AsyncRuntimeExt, AsyncTaskPool, AsyncTaskPoolExt, AsyncTask, AsyncMapReduce, AsyncWait, AsyncWaitAny, AsyncWaitAnyCallback}}};
41
42pub struct LocalTaskPool<O: Default + 'static> {
46 inner: UnsafeCell<VecDeque<Arc<AsyncTask<Self, O>>>>, }
48
49unsafe impl<O: Default + 'static> Sync for LocalTaskPool<O> {}
50
51impl<O: Default + 'static> Default for LocalTaskPool<O> {
52 fn default() -> Self {
53 LocalTaskPool {
54 inner: UnsafeCell::new(VecDeque::default()),
55 }
56 }
57}
58
59impl<O: Default + 'static> AsyncTaskPool<O> for LocalTaskPool<O> {
60 type Pool = LocalTaskPool<O>;
61
62 #[inline]
63 fn get_thread_id(&self) -> usize {
64 0
65 }
66
67 #[inline]
68 fn len(&self) -> usize {
69 unsafe {
70 (&*self.inner.get()).len()
71 }
72 }
73
74 #[inline]
75 fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
76 self.push_local(task)
77 }
78
79 #[inline]
80 fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
81 unsafe {
82 (&mut *self.inner.get()).push_back(task);
83 Ok(())
84 }
85 }
86
87 #[inline]
88 fn push_priority(&self,
89 _priority: usize,
90 task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
91 self.push_local(task)
92 }
93
94 #[inline]
95 fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
96 self.push_local(task)
97 }
98
99 #[inline]
100 fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
101 unsafe {
102 (&mut *self.inner.get()).pop_front()
103 }
104 }
105
106 #[inline]
107 fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
108 let mut all = Vec::with_capacity(self.len());
109
110 let internal = unsafe { &mut *self.inner.get() };
111 for _ in 0..internal.len() {
112 if let Some(task) = internal.pop_front() {
113 all.push(task);
114 }
115 }
116
117 all.into_iter()
118 }
119}
120
121impl<O: Default + 'static> AsyncTaskPoolExt<O> for LocalTaskPool<O> {
122 fn worker_len(&self) -> usize {
123 1
124 }
125}
126
127impl<O: Default + 'static> LocalTaskPool<O> {
128 pub fn new() -> Self {
130 Self::default()
131 }
132
133 #[inline]
135 pub(crate) fn internal_len(&self) -> usize {
136 unsafe {
137 (&*self.inner.get()).len()
138 }
139 }
140
141 #[inline]
143 pub(crate) fn will_wakeup(&self, task: Arc<AsyncTask<Self, O>>) {
144 unsafe {
145 (&mut *self.inner.get()).push_back(task);
146 }
147 }
148}
149
150pub struct LocalTaskRuntime<O: Default + 'static = ()>(Arc<InnerLocalTaskRuntime<O>>);
154
155struct InnerLocalTaskRuntime<O: Default + 'static = ()> {
156 uid: usize, running: Arc<AtomicBool>, pool: Arc<LocalTaskPool<O>>, }
160
161impl<O: Default + 'static> Clone for LocalTaskRuntime<O> {
162 fn clone(&self) -> Self {
163 LocalTaskRuntime(self.0.clone())
164 }
165}
166
167impl<O: Default + 'static> AsyncRuntime<O> for LocalTaskRuntime<O> {
168 type Pool = LocalTaskPool<O>;
169
170 fn shared_pool(&self) -> Arc<Self::Pool> {
171 self.0.pool.clone()
172 }
173
174 fn get_id(&self) -> usize {
175 self.0.uid
176 }
177
178 fn wait_len(&self) -> usize {
179 self.0.pool.len()
180 }
181
182 fn len(&self) -> usize {
183 self.wait_len()
184 }
185
186 fn alloc<R: 'static>(&self) -> TaskId {
187 TaskId(UnsafeCell::new(0))
188 }
189
190 fn spawn<F>(&self, future: F) -> Result<TaskId>
191 where
192 F: Future<Output = O> + 'static {
193 let task_id = self.alloc::<F::Output>();
194 if let Err(e) = self.spawn_by_id(task_id.clone(), future) {
195 return Err(e);
196 }
197
198 Ok(task_id)
199 }
200
201 fn spawn_local<F>(&self, future: F) -> Result<TaskId>
202 where
203 F: Future<Output = O> + 'static {
204 let task_id = self.alloc::<F::Output>();
205 if let Err(e) = self.spawn_local_by_id(task_id.clone(), future) {
206 return Err(e);
207 }
208
209 Ok(task_id)
210 }
211
212 fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
213 where
214 F: Future<Output = O> + 'static {
215 let task_id = self.alloc::<F::Output>();
216 if let Err(e) = self.spawn_priority_by_id(task_id.clone(), priority, future) {
217 return Err(e);
218 }
219
220 Ok(task_id)
221 }
222
223 fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
224 where
225 F: Future<Output = O> + 'static {
226 let task_id = self.alloc::<F::Output>();
227 if let Err(e) = self.spawn_yield_by_id(task_id.clone(), future) {
228 return Err(e);
229 }
230
231 Ok(task_id)
232 }
233
234 fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
235 where
236 F: Future<Output = O> + 'static {
237 let task_id = self.alloc::<F::Output>();
238 if let Err(e) = self.spawn_timing_by_id(task_id.clone(), future, time) {
239 return Err(e);
240 }
241
242 Ok(task_id)
243 }
244
245 fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
246 where
247 F: Future<Output = O> + 'static {
248 self.spawn_local_by_id(task_id, future)
249 }
250
251 fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
252 where
253 F: Future<Output = O> + 'static {
254 if let Err(e) = self.0.pool.push_local(Arc::new(AsyncTask::new(
255 task_id,
256 self.0.pool.clone(),
257 DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
258 Some(future.boxed_local()),
259 ))) {
260 return Err(Error::new(ErrorKind::Other, e));
261 }
262
263 Ok(())
264 }
265
266 fn spawn_priority_by_id<F>(&self,
267 task_id: TaskId,
268 _priority: usize,
269 future: F) -> Result<()>
270 where
271 F: Future<Output = O> + 'static {
272 if let Err(e) = self.0.pool.push_priority(DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
273 Arc::new(AsyncTask::new(
274 task_id,
275 self.0.pool.clone(),
276 DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
277 Some(future.boxed_local()),
278 ))) {
279 return Err(Error::new(ErrorKind::Other, e));
280 }
281
282 Ok(())
283 }
284
285 #[inline]
287 fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
288 where
289 F: Future<Output = O> + 'static {
290 self.spawn_priority_by_id(task_id,
291 DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
292 future)
293 }
294
295 fn spawn_timing_by_id<F>(&self,
297 _task_id: TaskId,
298 _future: F,
299 _time: usize) -> Result<()>
300 where
301 F: Future<Output = O> + 'static {
302 Err(Error::new(ErrorKind::Other, "unimplemented"))
303 }
304
305 fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
307 unimplemented!()
308 }
309
310 fn wakeup<Output: 'static>(&self, task_id: &TaskId) {
312 unimplemented!()
313 }
314
315 fn wait<V: 'static>(&self) -> AsyncWait<V> {
317 AsyncWait::new(self.wait_any(2))
318 }
319
320 fn wait_any<V: 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
322 let (producor, consumer) = async_bounded(capacity);
323
324 AsyncWaitAny::new(capacity, producor, consumer)
325 }
326
327 fn wait_any_callback<V: 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
329 let (producor, consumer) = async_bounded(capacity);
330
331 AsyncWaitAnyCallback::new(capacity, producor, consumer)
332 }
333
334 fn map_reduce<V: 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
336 let (producor, consumer) = async_bounded(capacity);
337
338 AsyncMapReduce::new(0, capacity, producor, consumer)
339 }
340
341 fn timeout(&self, _timeout: usize) -> LocalBoxFuture<'static, ()> {
343 unimplemented!()
344 }
345
346 fn yield_now(&self) -> LocalBoxFuture<'static, ()> {
348 async move {
349 YieldNow(false).await;
350 }.boxed_local()
351 }
352
353 fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> LocalBoxStream<'static, FO>
355 where
356 S: Stream<Item = SO> + 'static,
357 SO: 'static,
358 F: FnMut(SO) -> AsyncPipelineResult<FO> + 'static,
359 FO: 'static,
360 {
361 let output = stream! {
362 for await value in input {
363 match filter(value) {
364 AsyncPipelineResult::Disconnect => {
365 break;
367 },
368 AsyncPipelineResult::Filtered(result) => {
369 yield result;
370 },
371 }
372 }
373 };
374
375 output.boxed_local()
376 }
377
378 fn close(&self) -> bool {
380 if cfg!(target_arch = "aarch64") {
381 if let Ok(true) =
382 self
383 .0
384 .running
385 .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
386 {
387 true
389 } else {
390 false
391 }
392 } else {
393 if let Ok(true) =
394 self
395 .0
396 .running
397 .compare_exchange_weak(true, false, Ordering::SeqCst, Ordering::SeqCst)
398 {
399 true
401 } else {
402 false
403 }
404 }
405 }
406}
407
408impl<O: Default + 'static> AsyncRuntimeExt<O> for LocalTaskRuntime<O> {
409 fn spawn_with_context<F, C>(&self,
410 _task_id: TaskId,
411 _future: F,
412 _context: C) -> Result<()>
413 where F: Future<Output = O> + 'static,
414 C: 'static {
415 Err(Error::new(ErrorKind::Other, "unimplemented"))
416 }
417
418 fn spawn_timing_with_context<F, C>(&self,
420 task_id: TaskId,
421 future: F,
422 context: C,
423 time: usize) -> Result<()>
424 where F: Future<Output = O> + 'static,
425 C: 'static {
426 Err(Error::new(ErrorKind::Other, "unimplemented"))
427 }
428
429 fn block_on<F>(&self, future: F) -> Result<F::Output>
431 where F: Future + 'static,
432 <F as Future>::Output: Default + 'static {
433 let runner = LocalTaskRunner(self.clone());
434 let mut result: Option<<F as Future>::Output> = None;
435 let result_ptr = (&mut result) as *mut Option<<F as Future>::Output>;
436
437 self.spawn_local(async move {
438 let r = future.await;
440 unsafe {
441 *result_ptr = Some(r);
442 }
443
444 Default::default()
445 });
446
447 loop {
448 while self.internal_len() > 0 {
450 runner.run_once();
451 }
452
453 if let Some(result) = result.take() {
455 return Ok(result);
457 }
458 }
459 }
460}
461
462impl<O: Default + 'static> LocalTaskRuntime<O> {
463 #[inline]
465 pub fn is_running(&self) -> bool {
466 self
467 .0
468 .running
469 .load(Ordering::Relaxed)
470 }
471
472 #[inline]
474 pub fn internal_len(&self) -> usize {
475 self.0.pool.internal_len()
476 }
477
478 #[inline]
480 pub(crate) fn will_wakeup(&self, task: Arc<AsyncTask<<Self as AsyncRuntime<O>>::Pool, O>>) {
481 self.0.pool.will_wakeup(task);
482 }
483
484 pub fn send<F>(&self, future: F)
486 where
487 F: Future<Output = O> + 'static,
488 {
489 let task_id = self.alloc::<F::Output>();
490 self.0.pool.push(Arc::new(AsyncTask::new(
491 task_id,
492 self.0.pool.clone(),
493 DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
494 Some(future.boxed_local()),
495 )));
496 }
497}
498
499pub struct LocalTaskRunner<O: Default + 'static = ()>(LocalTaskRuntime<O>);
503
504unsafe impl<O: Default + 'static> Send for LocalTaskRunner<O> {}
505impl<O: Default + 'static> !Sync for LocalTaskRunner<O> {}
506
507impl<O: Default + 'static> LocalTaskRunner<O> {
508 pub fn new() -> Self {
510 let inner = InnerLocalTaskRuntime {
511 uid: alloc_rt_uid(),
512 running: Arc::new(AtomicBool::new(false)),
513 pool: Arc::new(LocalTaskPool::new()),
514 };
515
516 LocalTaskRunner(LocalTaskRuntime(Arc::new(inner)))
517 }
518
519 pub fn get_runtime(&self) -> LocalTaskRuntime<O> {
521 self.0.clone()
522 }
523
524 pub fn startup(self,
526 thread_name: &str,
527 thread_stack_size: usize) -> LocalTaskRuntime<O> {
528 let rt = self.get_runtime();
529 let rt_copy = rt.clone();
530 let _ = thread::Builder::new()
531 .name(thread_name.to_string())
532 .stack_size(thread_stack_size)
533 .spawn(move || {
534 rt_copy
535 .0
536 .running
537 .store(true, Ordering::Relaxed);
538
539 while rt_copy.is_running() {
540 self.run_once();
541 }
542 });
543
544 rt
545 }
546
547 #[inline]
549 pub fn run_once(&self) {
550 unsafe {
551 if let Some(task) = (self.0).0.pool.try_pop() {
552 let waker = waker_ref(&task);
553 let mut context = Context::from_waker(&*waker);
554 if let Some(mut future) = task.get_inner() {
555 if let Poll::Pending = future.as_mut().poll(&mut context) {
556 task.set_inner(Some(future));
558 }
559 }
560 }
561 }
562 }
563
564 pub fn into_local(self) -> LocalTaskRuntime<O> {
566 self.0
567 }
568}
569
570#[test]
571fn test_local_compatible_wasm_runtime_block_on() {
572 struct AtomicCounter(AtomicUsize, Instant);
573 impl Drop for AtomicCounter {
574 fn drop(&mut self) {
575 {
576 println!(
577 "!!!!!!drop counter, count: {:?}, time: {:?}",
578 self.0.load(Ordering::Relaxed),
579 Instant::now() - self.1
580 );
581 }
582 }
583 }
584
585 let rt = LocalTaskRunner::<()>::new().into_local();
586
587 let counter = Arc::new(AtomicCounter(AtomicUsize::new(0), Instant::now()));
588 for _ in 0..10000000 {
589 let counter_copy = counter.clone();
590 let _ = rt.block_on(async move { counter_copy.0.fetch_add(1, Ordering::Relaxed) });
591 }
592}
593
594
595
596
597
598
599
600