1use std::thread;
21use std::sync::Arc;
22use std::vec::IntoIter;
23use std::future::Future;
24use std::cell::UnsafeCell;
25use std::task::{Context, Poll, Waker};
26use std::io::{Error, ErrorKind, Result};
27use std::collections::vec_deque::VecDeque;
28use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
29
30use async_stream::stream;
31use crossbeam_channel::Sender;
32use crossbeam_queue::SegQueue;
33use flume::bounded as async_bounded;
34use futures::{
35 future::{BoxFuture, FutureExt},
36 stream::{BoxStream, Stream, StreamExt},
37 task::waker_ref,
38};
39use parking_lot::{Condvar, Mutex};
40use quanta::Clock;
41
42use wrr::IWRRSelector;
43
44use super::{
45 PI_ASYNC_THREAD_LOCAL_ID, DEFAULT_MAX_HIGH_PRIORITY_BOUNDED, DEFAULT_HIGH_PRIORITY_BOUNDED, DEFAULT_MAX_LOW_PRIORITY_BOUNDED, alloc_rt_uid, AsyncMapReduce, AsyncPipelineResult, AsyncRuntime,
46 AsyncRuntimeExt, AsyncTask, AsyncTaskPool, AsyncTaskPoolExt, AsyncTaskTimer, AsyncWait,
47 AsyncWaitAny, AsyncWaitAnyCallback, AsyncWaitTimeout, LocalAsyncRuntime, TaskId, YieldNow
48};
49use crate::rt::{TaskHandle, AsyncTimingTask};
50
51pub struct SingleTaskPool<O: Default + 'static> {
55 id: usize, public: SegQueue<Arc<AsyncTask<SingleTaskPool<O>, O>>>, internal: UnsafeCell<VecDeque<Arc<AsyncTask<SingleTaskPool<O>, O>>>>, stack: UnsafeCell<Vec<Arc<AsyncTask<SingleTaskPool<O>, O>>>>, selector: UnsafeCell<IWRRSelector<2>>, consume_count: AtomicUsize, produce_count: AtomicUsize, thread_waker: Option<Arc<(AtomicBool, Mutex<()>, Condvar)>>, }
64
65unsafe impl<O: Default + 'static> Send for SingleTaskPool<O> {}
66unsafe impl<O: Default + 'static> Sync for SingleTaskPool<O> {}
67
68impl<O: Default + 'static> Default for SingleTaskPool<O> {
69 fn default() -> Self {
70 SingleTaskPool::new([1, 1])
71 }
72}
73
74impl<O: Default + 'static> AsyncTaskPool<O> for SingleTaskPool<O> {
75 type Pool = SingleTaskPool<O>;
76
77 #[inline]
78 fn get_thread_id(&self) -> usize {
79 let rt_uid = self.id;
80 match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
81 let current = unsafe { *thread_id.get() };
82 if current == usize::MAX {
83 unsafe {
85 *thread_id.get() = rt_uid << 32;
86 *thread_id.get()
87 }
88 } else {
89 current
90 }
91 }) {
92 Err(e) => {
93 panic!(
95 "Get thread id failed, thread: {:?}, reason: {:?}",
96 thread::current(),
97 e
98 );
99 }
100 Ok(id) => id,
101 }
102 }
103
104 #[inline]
105 fn len(&self) -> usize {
106 if let Some(len) = self
107 .produce_count
108 .load(Ordering::Relaxed)
109 .checked_sub(self.consume_count.load(Ordering::Relaxed))
110 {
111 len
112 } else {
113 0
114 }
115 }
116
117 #[inline]
118 fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
119 self.public.push(task);
120 self.produce_count.fetch_add(1, Ordering::Relaxed);
121 Ok(())
122 }
123
124 #[inline]
125 fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
126 let id = self.get_thread_id();
127 let rt_uid = task.owner();
128 if (id >> 32) == rt_uid {
129 unsafe {{
131 (&mut *self.internal.get()).push_back(task);
132 }}
133 self.produce_count.fetch_add(1, Ordering::Relaxed);
134 Ok(())
135 } else {
136 self.push(task)
138 }
139 }
140
141 #[inline]
142 fn push_priority(&self,
143 priority: usize,
144 task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
145 if priority >= DEFAULT_MAX_HIGH_PRIORITY_BOUNDED {
146 let id = self.get_thread_id();
148 let rt_uid = task.owner();
149 if (id >> 32) == rt_uid {
150 unsafe {
152 let stack = (&mut *self.stack.get());
153 if stack
154 .capacity()
155 .checked_sub(stack.len())
156 .unwrap_or(0) >= 0 {
157 (&mut *self.stack.get()).push(task);
159 } else {
160 (&mut *self.internal.get()).push_back(task);
162 }
163 }
164
165 self.produce_count.fetch_add(1, Ordering::Relaxed);
166 Ok(())
167 } else {
168 self.push(task)
170 }
171 } else if priority >= DEFAULT_HIGH_PRIORITY_BOUNDED {
172 self.push_local(task)
174 } else {
175 self.push(task)
177 }
178 }
179
180 #[inline]
181 fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
182 self.push_priority(DEFAULT_HIGH_PRIORITY_BOUNDED, task)
183 }
184
185 #[inline]
186 fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
187 let task = unsafe { (&mut *self
188 .stack
189 .get())
190 .pop()
191 };
192 if task.is_some() {
193 self.consume_count.fetch_add(1, Ordering::Relaxed);
195 return task;
196 }
197
198 let task = try_pop_by_weight(self);
200 if task.is_some() {
201 self
202 .consume_count
203 .fetch_add(1, Ordering::Relaxed);
204 }
205 task
206 }
207
208 #[inline]
209 fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
210 let mut all = Vec::with_capacity(self.len());
211
212 let internal = unsafe { (&mut *self.internal.get()) };
213 for _ in 0..internal.len() {
214 if let Some(task) = internal.pop_front() {
215 all.push(task);
216 }
217 }
218
219 let public_len = self.public.len();
220 for _ in 0..public_len {
221 if let Some(task) = self.public.pop() {
222 all.push(task);
223 }
224 }
225
226 all.into_iter()
227 }
228
229 #[inline]
230 fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
231 self.thread_waker.as_ref()
232 }
233}
234
235fn try_pop_by_weight<O: Default + 'static>(pool: &SingleTaskPool<O>)
237 -> Option<Arc<AsyncTask<SingleTaskPool<O>, O>>> {
238 unsafe {
239 match (&mut *pool.selector.get()).select() {
241 0 => {
242 let task = try_pop_external(pool);
244 if task.is_some() {
245 task
246 } else {
247 try_pop_internal(pool)
249 }
250 },
251 _ => {
252 let task = try_pop_internal(pool);
254 if task.is_some() {
255 task
256 } else {
257 try_pop_external(pool)
259 }
260 },
261 }
262 }
263}
264
265#[inline]
267fn try_pop_internal<O: Default + 'static>(pool: &SingleTaskPool<O>)
268 -> Option<Arc<AsyncTask<SingleTaskPool<O>, O>>> {
269 unsafe { (&mut *pool.internal.get()).pop_front() }
270}
271
272#[inline]
274fn try_pop_external<O: Default + 'static>(pool: &SingleTaskPool<O>)
275 -> Option<Arc<AsyncTask<SingleTaskPool<O>, O>>> {
276 pool.public.pop()
277}
278
279impl<O: Default + 'static> AsyncTaskPoolExt<O> for SingleTaskPool<O> {
280 fn set_thread_waker(&mut self, thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) {
281 self.thread_waker = Some(thread_waker);
282 }
283}
284
285impl<O: Default + 'static> SingleTaskPool<O> {
286 pub fn new(weights: [u8; 2]) -> Self {
288 let id = alloc_rt_uid();
289 let public = SegQueue::new();
290 let internal = UnsafeCell::new(VecDeque::new());
291 let stack = UnsafeCell::new(Vec::with_capacity(1));
292 let selector = UnsafeCell::new(IWRRSelector::new(weights));
293 let consume_count = AtomicUsize::new(0);
294 let produce_count = AtomicUsize::new(0);
295
296 SingleTaskPool {
297 id,
298 public,
299 internal,
300 stack,
301 selector,
302 consume_count,
303 produce_count,
304 thread_waker: Some(Arc::new((
305 AtomicBool::new(false),
306 Mutex::new(()),
307 Condvar::new(),
308 ))),
309 }
310 }
311}
312
313pub struct SingleTaskRuntime<
317 O: Default + 'static = (),
318 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>,
319>(
320 Arc<(
321 usize, Arc<P>, Sender<(usize, AsyncTimingTask<P, O>)>, AsyncTaskTimer<P, O>, AtomicUsize, AtomicUsize, )>,
328);
329
330unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Send
331 for SingleTaskRuntime<O, P>
332{
333}
334unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Sync
335 for SingleTaskRuntime<O, P>
336{
337}
338
339impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Clone
340 for SingleTaskRuntime<O, P>
341{
342 fn clone(&self) -> Self {
343 SingleTaskRuntime(self.0.clone())
344 }
345}
346
347impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>> AsyncRuntime<O>
348 for SingleTaskRuntime<O, P>
349{
350 type Pool = P;
351
352 fn shared_pool(&self) -> Arc<Self::Pool> {
354 (self.0).1.clone()
355 }
356
357 fn get_id(&self) -> usize {
359 (self.0).0
360 }
361
362 fn wait_len(&self) -> usize {
364 (self.0)
365 .4
366 .load(Ordering::Relaxed)
367 .checked_sub((self.0).5.load(Ordering::Relaxed))
368 .unwrap_or(0)
369 }
370
371 fn len(&self) -> usize {
373 (self.0).1.len()
374 }
375
376 fn alloc<R: 'static>(&self) -> TaskId {
378 TaskId(UnsafeCell::new((TaskHandle::<R>::default().into_raw() as u128) << 64 | self.get_id() as u128 & 0xffffffffffffffff))
379 }
380
381 fn spawn<F>(&self, future: F) -> Result<TaskId>
383 where
384 F: Future<Output = O> + Send + 'static,
385 {
386 let task_id = self.alloc::<F::Output>();
387 if let Err(e) = self.spawn_by_id(task_id.clone(), future) {
388 return Err(e);
389 }
390
391 Ok(task_id)
392 }
393
394 fn spawn_local<F>(&self, future: F) -> Result<TaskId>
396 where
397 F: Future<Output = O> + Send + 'static {
398 let task_id = self.alloc::<F::Output>();
399 if let Err(e) = self.spawn_local_by_id(task_id.clone(), future) {
400 return Err(e);
401 }
402
403 Ok(task_id)
404 }
405
406 fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
408 where
409 F: Future<Output = O> + Send + 'static {
410 let task_id = self.alloc::<F::Output>();
411 if let Err(e) = self.spawn_priority_by_id(task_id.clone(), priority, future) {
412 return Err(e);
413 }
414
415 Ok(task_id)
416 }
417
418 fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
420 where
421 F: Future<Output = O> + Send + 'static {
422 let task_id = self.alloc::<F::Output>();
423 if let Err(e) = self.spawn_yield_by_id(task_id.clone(), future) {
424 return Err(e);
425 }
426
427 Ok(task_id)
428 }
429
430 fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
432 where
433 F: Future<Output = O> + Send + 'static,
434 {
435 let task_id = self.alloc::<F::Output>();
436 if let Err(e) = self.spawn_timing_by_id(task_id.clone(), future, time) {
437 return Err(e);
438 }
439
440 Ok(task_id)
441 }
442
443 fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
445 where
446 F: Future<Output = O> + Send + 'static {
447 if let Err(e) = (self.0).1.push(Arc::new(AsyncTask::new(
448 task_id,
449 (self.0).1.clone(),
450 DEFAULT_MAX_LOW_PRIORITY_BOUNDED,
451 Some(future.boxed()),
452 ))) {
453 return Err(Error::new(ErrorKind::Other, e));
454 }
455
456 Ok(())
457 }
458
459 fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
461 where
462 F: Future<Output = O> + Send + 'static {
463 (self.0).1.push_local(Arc::new(AsyncTask::new(
464 task_id,
465 (self.0).1.clone(),
466 DEFAULT_HIGH_PRIORITY_BOUNDED,
467 Some(future.boxed()))))
468 }
469
470 fn spawn_priority_by_id<F>(&self,
472 task_id: TaskId,
473 priority: usize,
474 future: F) -> Result<()>
475 where
476 F: Future<Output = O> + Send + 'static {
477 (self.0).1.push_priority(priority, Arc::new(AsyncTask::new(
478 task_id,
479 (self.0).1.clone(),
480 priority,
481 Some(future.boxed()))))
482 }
483
484 #[inline]
486 fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
487 where
488 F: Future<Output = O> + Send + 'static {
489 self.spawn_priority_by_id(task_id,
490 DEFAULT_HIGH_PRIORITY_BOUNDED,
491 future)
492 }
493
494 fn spawn_timing_by_id<F>(&self,
496 task_id: TaskId,
497 future: F,
498 time: usize) -> Result<()>
499 where
500 F: Future<Output = O> + Send + 'static {
501 let rt = self.clone();
502 self.spawn_by_id(task_id, async move {
503 (rt.0).3.set_timer(
504 AsyncTimingTask::WaitRun(Arc::new(AsyncTask::new(
505 rt.alloc::<F::Output>(),
506 (rt.0).1.clone(),
507 DEFAULT_HIGH_PRIORITY_BOUNDED,
508 Some(future.boxed()),
509 ))),
510 time,
511 );
512
513 (rt.0).4.fetch_add(1, Ordering::Relaxed);
514 Default::default()
515 })
516 }
517
518 fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
520 task_id.set_waker::<Output>(waker);
521 Poll::Pending
522 }
523
524 fn wakeup<Output: 'static>(&self, task_id: &TaskId) {
526 task_id.wakeup::<Output>();
527 }
528
529 fn wait<V: Send + 'static>(&self) -> AsyncWait<V> {
531 AsyncWait(self.wait_any(2))
532 }
533
534 fn wait_any<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
536 let (producor, consumer) = async_bounded(capacity);
537
538 AsyncWaitAny {
539 capacity,
540 producor,
541 consumer,
542 }
543 }
544
545 fn wait_any_callback<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
547 let (producor, consumer) = async_bounded(capacity);
548
549 AsyncWaitAnyCallback {
550 capacity,
551 producor,
552 consumer,
553 }
554 }
555
556 fn map_reduce<V: Send + 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
558 let (producor, consumer) = async_bounded(capacity);
559
560 AsyncMapReduce {
561 count: 0,
562 capacity,
563 producor,
564 consumer,
565 }
566 }
567
568 fn timeout(&self, timeout: usize) -> BoxFuture<'static, ()> {
570 let rt = self.clone();
571 let producor = (self.0).2.clone();
572
573 AsyncWaitTimeout::new(rt, producor, timeout).boxed()
574 }
575
576 fn yield_now(&self) -> BoxFuture<'static, ()> {
578 async move {
579 YieldNow(false).await;
580 }.boxed()
581 }
582
583 fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> BoxStream<'static, FO>
585 where
586 S: Stream<Item = SO> + Send + 'static,
587 SO: Send + 'static,
588 F: FnMut(SO) -> AsyncPipelineResult<FO> + Send + 'static,
589 FO: Send + 'static,
590 {
591 let output = stream! {
592 for await value in input {
593 match filter(value) {
594 AsyncPipelineResult::Disconnect => {
595 break;
597 },
598 AsyncPipelineResult::Filtered(result) => {
599 yield result;
600 },
601 }
602 }
603 };
604
605 output.boxed()
606 }
607
608 fn close(&self) -> bool {
610 false
611 }
612}
613
614impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>> AsyncRuntimeExt<O>
615 for SingleTaskRuntime<O, P>
616{
617 fn spawn_with_context<F, C>(&self, task_id: TaskId, future: F, context: C) -> Result<()>
618 where
619 F: Future<Output = O> + Send + 'static,
620 C: 'static,
621 {
622 if let Err(e) = (self.0).1.push(Arc::new(AsyncTask::with_context(
623 task_id,
624 (self.0).1.clone(),
625 DEFAULT_MAX_LOW_PRIORITY_BOUNDED,
626 Some(future.boxed()),
627 context,
628 ))) {
629 return Err(Error::new(ErrorKind::Other, e));
630 }
631
632 Ok(())
633 }
634
635 fn spawn_timing_with_context<F, C>(
636 &self,
637 task_id: TaskId,
638 future: F,
639 context: C,
640 time: usize,
641 ) -> Result<()>
642 where
643 F: Future<Output = O> + Send + 'static,
644 C: Send + 'static,
645 {
646 let rt = self.clone();
647 self.spawn_by_id(task_id, async move {
648 (rt.0).3.set_timer(
649 AsyncTimingTask::WaitRun(Arc::new(AsyncTask::with_context(
650 rt.alloc::<F::Output>(),
651 (rt.0).1.clone(),
652 DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
653 Some(future.boxed()),
654 context,
655 ))),
656 time,
657 );
658
659 (rt.0).4.fetch_add(1, Ordering::Relaxed);
660 Default::default()
661 })
662 }
663
664 fn block_on<F>(&self, future: F) -> Result<F::Output>
665 where
666 F: Future + Send + 'static,
667 <F as Future>::Output: Default + Send + 'static,
668 {
669 let runner = SingleTaskRunner {
670 is_running: AtomicBool::new(true),
671 runtime: self.clone(),
672 clock: Clock::new(),
673 };
674 let mut result: Option<<F as Future>::Output> = None;
675 let result_raw = (&mut result) as *mut Option<<F as Future>::Output> as usize;
676
677 self.spawn(async move {
678 let r = future.await;
680 unsafe {
681 *(result_raw as *mut Option<<F as Future>::Output>) = Some(r);
682 }
683
684 Default::default()
685 });
686
687 loop {
688 while runner.run()? > 0 {}
690
691 if let Some(result) = result.take() {
693 return Ok(result);
695 }
696 }
697 }
698}
699
700impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>
701 SingleTaskRuntime<O, P>
702{
703 pub fn to_local_runtime(&self) -> LocalAsyncRuntime<O> {
705 LocalAsyncRuntime {
706 inner: self.as_raw(),
707 get_id_func: SingleTaskRuntime::<O, P>::get_id_raw,
708 spawn_func: SingleTaskRuntime::<O, P>::spawn_raw,
709 spawn_timing_func: SingleTaskRuntime::<O, P>::spawn_timing_raw,
710 timeout_func: SingleTaskRuntime::<O, P>::timeout_raw,
711 }
712 }
713
714 #[inline]
716 pub(crate) fn as_raw(&self) -> *const () {
717 Arc::into_raw(self.0.clone()) as *const ()
718 }
719
720 #[inline]
722 pub(crate) fn from_raw(raw: *const ()) -> Self {
723 let inner = unsafe {
724 Arc::from_raw(
725 raw as *const (
726 usize,
727 Arc<P>,
728 Sender<(usize, AsyncTimingTask<P, O>)>,
729 AsyncTaskTimer<P, O>,
730 AtomicUsize,
731 AtomicUsize,
732 ),
733 )
734 };
735 SingleTaskRuntime(inner)
736 }
737
738 pub(crate) fn get_id_raw(raw: *const ()) -> usize {
740 let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
741 let id = rt.get_id();
742 Arc::into_raw(rt.0); id
744 }
745
746 pub(crate) fn spawn_raw(raw: *const (), future: BoxFuture<'static, O>) -> Result<()> {
748 let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
749 let result = rt.spawn_by_id(rt.alloc::<O>(), future);
750 Arc::into_raw(rt.0); result
752 }
753
754 pub(crate) fn spawn_timing_raw(
756 raw: *const (),
757 future: BoxFuture<'static, O>,
758 timeout: usize,
759 ) -> Result<()> {
760 let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
761 let result = rt.spawn_timing_by_id(rt.alloc::<O>(), future, timeout);
762 Arc::into_raw(rt.0); result
764 }
765
766 pub(crate) fn timeout_raw(raw: *const (), timeout: usize) -> BoxFuture<'static, ()> {
768 let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
769 let boxed = rt.timeout(timeout);
770 Arc::into_raw(rt.0); boxed
772 }
773}
774
775pub struct SingleTaskRunner<
779 O: Default + 'static,
780 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>,
781> {
782 is_running: AtomicBool, runtime: SingleTaskRuntime<O, P>, clock: Clock, }
786
787unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Send
788 for SingleTaskRunner<O, P>
789{
790}
791unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Sync
792 for SingleTaskRunner<O, P>
793{
794}
795
796impl<O: Default + 'static> Default for SingleTaskRunner<O> {
797 fn default() -> Self {
798 SingleTaskRunner::new(SingleTaskPool::default())
799 }
800}
801
802impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>
803 SingleTaskRunner<O, P>
804{
805 pub fn new(pool: P) -> Self {
807 let rt_uid = pool.get_thread_id() >> 32;
808 let pool = Arc::new(pool);
809
810 let timer = AsyncTaskTimer::new();
812 let producor = timer.producor.clone();
813 let timer_producor_count = AtomicUsize::new(0);
814 let timer_consume_count = AtomicUsize::new(0);
815
816 let runtime = SingleTaskRuntime(Arc::new((rt_uid,
818 pool,
819 producor,
820 timer,
821 timer_producor_count,
822 timer_consume_count)));
823
824 SingleTaskRunner {
825 is_running: AtomicBool::new(false),
826 runtime,
827 clock: Clock::new(),
828 }
829 }
830
831 pub fn get_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
833 (self.runtime.0).1.get_thread_waker().cloned()
834 }
835
836 pub fn startup(&self) -> Option<SingleTaskRuntime<O, P>> {
838 if cfg!(target_arch = "aarch64") {
839 match self
840 .is_running
841 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
842 {
843 Ok(false) => {
844 Some(self.runtime.clone())
846 }
847 _ => {
848 None
850 }
851 }
852 } else {
853 match self.is_running.compare_exchange_weak(
854 false,
855 true,
856 Ordering::SeqCst,
857 Ordering::SeqCst,
858 ) {
859 Ok(false) => {
860 Some(self.runtime.clone())
862 }
863 _ => {
864 None
866 }
867 }
868 }
869 }
870
871 pub fn run_once(&self) -> Result<usize> {
873 if !self.is_running.load(Ordering::Relaxed) {
874 return Err(Error::new(
876 ErrorKind::Other,
877 "Single thread runtime not running",
878 ));
879 }
880
881 let mut pop_len = 0;
883 (self.runtime.0)
884 .4
885 .fetch_add((self.runtime.0).3.consume(),
886 Ordering::Relaxed);
887 loop {
888 let current_time = (self.runtime.0).3.is_require_pop();
889 if let Some(current_time) = current_time {
890 let timed_out = (self.runtime.0).3.pop(current_time);
892 if let Some((handle, timing_task)) = timed_out {
893 match timing_task {
894 AsyncTimingTask::Pended(expired) => {
895 self.runtime.wakeup::<O>(&expired);
897 if let Some(task) = (self.runtime.0).1.try_pop() {
898 run_task(task);
899 }
900 }
901 AsyncTimingTask::WaitRun(expired) => {
902 (self.runtime.0).1.push_priority(handle, expired);
904 if let Some(task) = (self.runtime.0).1.try_pop() {
905 run_task(task);
906 }
907 }
908 }
909 pop_len += 1;
910 }
911 } else {
912 break;
914 }
915 }
916 (self.runtime.0)
917 .5
918 .fetch_add(pop_len,
919 Ordering::Relaxed);
920
921 match (self.runtime.0).1.try_pop() {
923 None => {
924 return Ok(0);
926 }
927 Some(task) => {
928 run_task(task);
929 }
930 }
931
932 Ok((self.runtime.0).1.len())
933 }
934
935 pub fn run(&self) -> Result<usize> {
937 if !self.is_running.load(Ordering::Relaxed) {
938 return Err(Error::new(
940 ErrorKind::Other,
941 "Single thread runtime not running",
942 ));
943 }
944
945 loop {
946 let mut pop_len = 0;
948 let mut start_run_millis = self.clock.recent(); (self.runtime.0)
950 .4
951 .fetch_add((self.runtime.0).3.consume(),
952 Ordering::Relaxed);
953 loop {
954 let current_time = (self.runtime.0).3.is_require_pop();
955 if let Some(current_time) = current_time {
956 let timed_out = (self.runtime.0).3.pop(current_time);
958 if let Some((handle, timing_task)) = timed_out {
959 match timing_task {
960 AsyncTimingTask::Pended(expired) => {
961 self.runtime.wakeup::<O>(&expired);
963 if let Some(task) = (self.runtime.0).1.try_pop() {
964 run_task(task);
965 }
966 }
967 AsyncTimingTask::WaitRun(expired) => {
968 (self.runtime.0).1.push_priority(handle, expired);
970 if let Some(task) = (self.runtime.0).1.try_pop() {
971 run_task(task);
972 }
973 }
974 }
975 pop_len += 1;
976 }
977 } else {
978 break;
980 }
981 }
982 (self.runtime.0)
983 .5
984 .fetch_add(pop_len,
985 Ordering::Relaxed);
986
987 while self
989 .clock
990 .recent()
991 .duration_since(start_run_millis)
992 .as_millis() < 1 {
993 match (self.runtime.0).1.try_pop() {
994 None => {
995 return Ok((self.runtime.0).1.len());
997 }
998 Some(task) => {
999 run_task(task);
1000 }
1001 }
1002 }
1003 }
1004 }
1005
1006 pub fn into_local(self) -> SingleTaskRuntime<O, P> {
1008 self.runtime
1009 }
1010}
1011
1012#[inline]
1014fn run_task<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(
1015 task: Arc<AsyncTask<P, O>>,
1016) {
1017 let waker = waker_ref(&task);
1018 let mut context = Context::from_waker(&*waker);
1019 if let Some(mut future) = task.get_inner() {
1020 if let Poll::Pending = future.as_mut().poll(&mut context) {
1021 task.set_inner(Some(future));
1023 }
1024 }
1025}