1use std::thread;
15use std::sync::Arc;
16use std::vec::IntoIter;
17use std::future::Future;
18use std::cell::UnsafeCell;
19use std::io::{Error, ErrorKind, Result};
20use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
21use std::task::{Context, Poll, Waker};
22use std::collections::vec_deque::VecDeque;
23
24use async_stream::stream;
25use crossbeam_channel::Sender;
26use crossbeam_queue::SegQueue;
27use flume::bounded as async_bounded;
28use futures::{
29 future::{FutureExt, LocalBoxFuture},
30 stream::{LocalBoxStream, Stream, StreamExt},
31 task::waker_ref,
32};
33use parking_lot::{Condvar, Mutex};
34use quanta::Clock;
35
36use wrr::IWRRSelector;
37
38use crate::{
39 rt::{
40 PI_ASYNC_THREAD_LOCAL_ID, DEFAULT_MAX_HIGH_PRIORITY_BOUNDED, DEFAULT_HIGH_PRIORITY_BOUNDED, DEFAULT_MAX_LOW_PRIORITY_BOUNDED, alloc_rt_uid,
41 serial::{AsyncMapReduce, AsyncRuntime, AsyncRuntimeExt,
42 AsyncTask, AsyncTaskPool, AsyncTaskPoolExt, AsyncTaskTimer, AsyncTimingTask, AsyncWait,
43 AsyncWaitAny, AsyncWaitAnyCallback, AsyncWaitTimeout,
44 LocalAsyncRuntime,
45 },
46 AsyncPipelineResult, TaskId, TaskHandle, YieldNow
47 },
48};
49
50pub struct SingleTaskPool<O: Default + 'static> {
54 id: usize, public: SegQueue<Arc<AsyncTask<SingleTaskPool<O>, O>>>, internal: UnsafeCell<VecDeque<Arc<AsyncTask<SingleTaskPool<O>, O>>>>, stack: UnsafeCell<Vec<Arc<AsyncTask<SingleTaskPool<O>, O>>>>, selector: UnsafeCell<IWRRSelector<2>>, consume_count: AtomicUsize, produce_count: AtomicUsize, thread_waker: Option<Arc<(AtomicBool, Mutex<()>, Condvar)>>, }
63
64unsafe impl<O: Default + 'static> Send for SingleTaskPool<O> {}
65unsafe impl<O: Default + 'static> Sync for SingleTaskPool<O> {}
66
67impl<O: Default + 'static> Default for SingleTaskPool<O> {
68 fn default() -> Self {
69 SingleTaskPool::new([1, 1])
70 }
71}
72
73impl<O: Default + 'static> AsyncTaskPool<O> for SingleTaskPool<O> {
74 type Pool = SingleTaskPool<O>;
75
76 #[inline]
77 fn get_thread_id(&self) -> usize {
78 let rt_uid = self.id;
79 match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
80 let current = unsafe { *thread_id.get() };
81 if current == usize::MAX {
82 unsafe {
84 *thread_id.get() = rt_uid << 32;
85 *thread_id.get()
86 }
87 } else {
88 current
89 }
90 }) {
91 Err(e) => {
92 panic!(
94 "Get thread id failed, thread: {:?}, reason: {:?}",
95 thread::current(),
96 e
97 );
98 }
99 Ok(id) => id,
100 }
101 }
102
103 #[inline]
104 fn len(&self) -> usize {
105 if let Some(len) = self
106 .produce_count
107 .load(Ordering::Relaxed)
108 .checked_sub(self.consume_count.load(Ordering::Relaxed))
109 {
110 len
111 } else {
112 0
113 }
114 }
115
116 #[inline]
117 fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
118 self.public.push(task);
119 self.produce_count.fetch_add(1, Ordering::Relaxed);
120 Ok(())
121 }
122
123 #[inline]
124 fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
125 let id = self.get_thread_id();
126 let rt_uid = task.owner();
127 if (id >> 32) == rt_uid {
128 unsafe {{
130 (&mut *self.internal.get()).push_back(task);
131 }}
132 self.produce_count.fetch_add(1, Ordering::Relaxed);
133 Ok(())
134 } else {
135 self.push(task)
137 }
138 }
139
140 #[inline]
141 fn push_priority(&self,
142 priority: usize,
143 task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
144 if priority >= DEFAULT_MAX_HIGH_PRIORITY_BOUNDED {
145 let id = self.get_thread_id();
147 let rt_uid = task.owner();
148 if (id >> 32) == rt_uid {
149 unsafe {
151 let stack = (&mut *self.stack.get());
152 if stack
153 .capacity()
154 .checked_sub(stack.len())
155 .unwrap_or(0) >= 0 {
156 (&mut *self.stack.get()).push(task);
158 } else {
159 (&mut *self.internal.get()).push_back(task);
161 }
162 }
163
164 self.produce_count.fetch_add(1, Ordering::Relaxed);
165 Ok(())
166 } else {
167 self.push(task)
169 }
170 } else if priority >= DEFAULT_HIGH_PRIORITY_BOUNDED {
171 self.push_local(task)
173 } else {
174 self.push(task)
176 }
177 }
178
179 #[inline]
180 fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
181 self.push_priority(DEFAULT_HIGH_PRIORITY_BOUNDED, task)
182 }
183
184 #[inline]
185 fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
186 let task = unsafe { (&mut *self
187 .stack
188 .get())
189 .pop()
190 };
191 if task.is_some() {
192 self.consume_count.fetch_add(1, Ordering::Relaxed);
194 return task;
195 }
196
197 let task = try_pop_by_weight(self);
199 if task.is_some() {
200 self
201 .consume_count
202 .fetch_add(1, Ordering::Relaxed);
203 }
204 task
205 }
206
207 #[inline]
208 fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
209 let mut all = Vec::with_capacity(self.len());
210
211 let internal = unsafe { (&mut *self.internal.get()) };
212 for _ in 0..internal.len() {
213 if let Some(task) = internal.pop_front() {
214 all.push(task);
215 }
216 }
217
218 let public_len = self.public.len();
219 for _ in 0..public_len {
220 if let Some(task) = self.public.pop() {
221 all.push(task);
222 }
223 }
224
225 all.into_iter()
226 }
227
228 #[inline]
229 fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
230 self.thread_waker.as_ref()
231 }
232}
233
234fn try_pop_by_weight<O: Default + 'static>(pool: &SingleTaskPool<O>)
236 -> Option<Arc<AsyncTask<SingleTaskPool<O>, O>>> {
237 unsafe {
238 match (&mut *pool.selector.get()).select() {
240 0 => {
241 let task = try_pop_external(pool);
243 if task.is_some() {
244 task
245 } else {
246 try_pop_internal(pool)
248 }
249 },
250 _ => {
251 let task = try_pop_internal(pool);
253 if task.is_some() {
254 task
255 } else {
256 try_pop_external(pool)
258 }
259 },
260 }
261 }
262}
263
264#[inline]
266fn try_pop_internal<O: Default + 'static>(pool: &SingleTaskPool<O>)
267 -> Option<Arc<AsyncTask<SingleTaskPool<O>, O>>> {
268 unsafe { (&mut *pool.internal.get()).pop_front() }
269}
270
271#[inline]
273fn try_pop_external<O: Default + 'static>(pool: &SingleTaskPool<O>)
274 -> Option<Arc<AsyncTask<SingleTaskPool<O>, O>>> {
275 pool.public.pop()
276}
277
278impl<O: Default + 'static> AsyncTaskPoolExt<O> for SingleTaskPool<O> {
279 fn set_thread_waker(&mut self, thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) {
280 self.thread_waker = Some(thread_waker);
281 }
282}
283
284impl<O: Default + 'static> SingleTaskPool<O> {
285 pub fn new(weights: [u8; 2]) -> Self {
287 let rt_uid = alloc_rt_uid();
288 let public = SegQueue::new();
289 let internal = UnsafeCell::new(VecDeque::new());
290 let stack = UnsafeCell::new(Vec::with_capacity(1));
291 let selector = UnsafeCell::new(IWRRSelector::new(weights));
292 let consume_count = AtomicUsize::new(0);
293 let produce_count = AtomicUsize::new(0);
294
295 SingleTaskPool {
296 id: (rt_uid << 8) & 0xffff | 1,
297 public,
298 internal,
299 stack,
300 selector,
301 consume_count,
302 produce_count,
303 thread_waker: Some(Arc::new((
304 AtomicBool::new(false),
305 Mutex::new(()),
306 Condvar::new(),
307 ))),
308 }
309 }
310}
311
312pub struct SingleTaskRuntime<
316 O: Default + 'static = (),
317 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>,
318>(
319 Arc<(
320 usize, Arc<P>, Sender<(usize, AsyncTimingTask<P, O>)>, AsyncTaskTimer<P, O>, AtomicUsize, AtomicUsize, )>,
327);
328
329unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Send
330 for SingleTaskRuntime<O, P>
331{
332}
333unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Sync
334 for SingleTaskRuntime<O, P>
335{
336}
337
338impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Clone
339 for SingleTaskRuntime<O, P>
340{
341 fn clone(&self) -> Self {
342 SingleTaskRuntime(self.0.clone())
343 }
344}
345
346impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>> AsyncRuntime<O>
347 for SingleTaskRuntime<O, P>
348{
349 type Pool = P;
350
351 fn shared_pool(&self) -> Arc<Self::Pool> {
353 (self.0).1.clone()
354 }
355
356 fn get_id(&self) -> usize {
358 (self.0).0
359 }
360
361 fn wait_len(&self) -> usize {
363 (self.0)
364 .4
365 .load(Ordering::Relaxed)
366 .checked_sub((self.0).5.load(Ordering::Relaxed))
367 .unwrap_or(0)
368 }
369
370 fn len(&self) -> usize {
372 (self.0).1.len()
373 }
374
375 fn alloc<R: 'static>(&self) -> TaskId {
377 TaskId(UnsafeCell::new((TaskHandle::<R>::default().into_raw() as u128) << 64 | self.get_id() as u128 & 0xffffffffffffffff))
378 }
379
380 fn spawn<F>(&self, future: F) -> Result<TaskId>
382 where
383 F: Future<Output = O> + 'static {
384 let task_id = self.alloc::<F::Output>();
385 if let Err(e) = self.spawn_by_id(task_id.clone(), future) {
386 return Err(e);
387 }
388
389 Ok(task_id)
390 }
391
392 fn spawn_local<F>(&self, future: F) -> Result<TaskId>
394 where
395 F: Future<Output = O> + 'static {
396 let task_id = self.alloc::<F::Output>();
397 if let Err(e) = self.spawn_local_by_id(task_id.clone(), future) {
398 return Err(e);
399 }
400
401 Ok(task_id)
402 }
403
404 fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
406 where
407 F: Future<Output = O> + 'static {
408 let task_id = self.alloc::<F::Output>();
409 if let Err(e) = self.spawn_priority_by_id(task_id.clone(), priority, future) {
410 return Err(e);
411 }
412
413 Ok(task_id)
414 }
415
416 fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
418 where
419 F: Future<Output = O> + 'static {
420 let task_id = self.alloc::<F::Output>();
421 if let Err(e) = self.spawn_yield_by_id(task_id.clone(), future) {
422 return Err(e);
423 }
424
425 Ok(task_id)
426 }
427
428 fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
430 where
431 F: Future<Output = O> + 'static {
432 let task_id = self.alloc::<F::Output>();
433 if let Err(e) = self.spawn_timing_by_id(task_id.clone(), future, time) {
434 return Err(e);
435 }
436
437 Ok(task_id)
438 }
439
440 fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
442 where
443 F: Future<Output = O> + 'static {
444 if let Err(e) = (self.0).1.push(Arc::new(AsyncTask::new(
445 task_id,
446 (self.0).1.clone(),
447 DEFAULT_MAX_LOW_PRIORITY_BOUNDED,
448 Some(future.boxed_local()),
449 ))) {
450 return Err(Error::new(ErrorKind::Other, e));
451 }
452
453 Ok(())
454 }
455
456 fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
458 where
459 F: Future<Output = O> + 'static {
460 (self.0).1.push_local(Arc::new(AsyncTask::new(
461 task_id,
462 (self.0).1.clone(),
463 DEFAULT_HIGH_PRIORITY_BOUNDED,
464 Some(future.boxed_local()))))
465 }
466
467 fn spawn_priority_by_id<F>(&self,
469 task_id: TaskId,
470 priority: usize,
471 future: F) -> Result<()>
472 where
473 F: Future<Output = O> + 'static {
474 (self.0).1.push_priority(priority, Arc::new(AsyncTask::new(
475 task_id,
476 (self.0).1.clone(),
477 priority,
478 Some(future.boxed_local()))))
479 }
480
481 #[inline]
483 fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
484 where
485 F: Future<Output = O> + 'static {
486 self.spawn_priority_by_id(task_id,
487 DEFAULT_HIGH_PRIORITY_BOUNDED,
488 future)
489 }
490
491 fn spawn_timing_by_id<F>(&self,
493 task_id: TaskId,
494 future: F,
495 time: usize) -> Result<()>
496 where
497 F: Future<Output = O> + 'static {
498 let rt = self.clone();
499 self.spawn_by_id(task_id, async move {
500 (rt.0).3.set_timer(
501 AsyncTimingTask::WaitRun(Arc::new(AsyncTask::new(
502 rt.alloc::<F::Output>(),
503 (rt.0).1.clone(),
504 DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
505 Some(future.boxed_local()),
506 ))),
507 time,
508 );
509
510 (rt.0).4.fetch_add(1, Ordering::Relaxed);
511 Default::default()
512 })
513 }
514
515 fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
517 task_id.set_waker::<Output>(waker);
518 Poll::Pending
519 }
520
521 fn wakeup<Output: 'static>(&self, task_id: &TaskId) {
523 task_id.wakeup::<Output>();
524 }
525
526 fn wait<V: 'static>(&self) -> AsyncWait<V> {
528 AsyncWait::new(self.wait_any(2))
529 }
530
531 fn wait_any<V: 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
533 let (producor, consumer) = async_bounded(capacity);
534
535 AsyncWaitAny::new(capacity, producor, consumer)
536 }
537
538 fn wait_any_callback<V: 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
540 let (producor, consumer) = async_bounded(capacity);
541
542 AsyncWaitAnyCallback::new(capacity, producor, consumer)
543 }
544
545 fn map_reduce<V: 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
547 let (producor, consumer) = async_bounded(capacity);
548
549 AsyncMapReduce::new(0, capacity, producor, consumer)
550 }
551
552 fn timeout(&self, timeout: usize) -> LocalBoxFuture<'static, ()> {
554 let rt = self.clone();
555 let producor = (self.0).2.clone();
556
557 AsyncWaitTimeout::new(rt, producor, timeout).boxed_local()
558 }
559
560 fn yield_now(&self) -> LocalBoxFuture<'static, ()> {
562 async move {
563 YieldNow(false).await;
564 }.boxed_local()
565 }
566
567 fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> LocalBoxStream<'static, FO>
569 where
570 S: Stream<Item = SO> + 'static,
571 SO: 'static,
572 F: FnMut(SO) -> AsyncPipelineResult<FO> + 'static,
573 FO: 'static,
574 {
575 let output = stream! {
576 for await value in input {
577 match filter(value) {
578 AsyncPipelineResult::Disconnect => {
579 break;
581 },
582 AsyncPipelineResult::Filtered(result) => {
583 yield result;
584 },
585 }
586 }
587 };
588
589 output.boxed_local()
590 }
591
592 fn close(&self) -> bool {
594 false
595 }
596}
597
598impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>> AsyncRuntimeExt<O>
599 for SingleTaskRuntime<O, P>
600{
601 fn spawn_with_context<F, C>(&self, task_id: TaskId, future: F, context: C) -> Result<()>
602 where
603 F: Future<Output = O> + 'static,
604 C: 'static,
605 {
606 if let Err(e) = (self.0).1.push(Arc::new(AsyncTask::with_context(
607 task_id,
608 (self.0).1.clone(),
609 DEFAULT_MAX_LOW_PRIORITY_BOUNDED,
610 Some(future.boxed_local()),
611 context,
612 ))) {
613 return Err(Error::new(ErrorKind::Other, e));
614 }
615
616 Ok(())
617 }
618
619 fn spawn_timing_with_context<F, C>(
620 &self,
621 task_id: TaskId,
622 future: F,
623 context: C,
624 time: usize,
625 ) -> Result<()>
626 where
627 F: Future<Output = O> + 'static,
628 C: 'static,
629 {
630 let rt = self.clone();
631 self.spawn_by_id(task_id, async move {
632 (rt.0).3.set_timer(
633 AsyncTimingTask::WaitRun(Arc::new(AsyncTask::with_context(
634 rt.alloc::<F::Output>(),
635 (rt.0).1.clone(),
636 DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
637 Some(future.boxed_local()),
638 context,
639 ))),
640 time,
641 );
642
643 (rt.0).4.fetch_add(1, Ordering::Relaxed);
644 Default::default()
645 })
646 }
647
648 fn block_on<F>(&self, future: F) -> Result<F::Output>
649 where
650 F: Future + 'static,
651 <F as Future>::Output: Default + 'static {
652 let runner = SingleTaskRunner {
653 is_running: AtomicBool::new(true),
654 runtime: self.clone(),
655 clock: Clock::new(),
656 };
657 let mut result: Option<<F as Future>::Output> = None;
658 let result_raw = (&mut result) as *mut Option<<F as Future>::Output> as usize;
659
660 self.spawn(async move {
661 let r = future.await;
663 unsafe {
664 *(result_raw as *mut Option<<F as Future>::Output>) = Some(r);
665 }
666
667 Default::default()
668 });
669
670 loop {
671 while runner.run()? > 0 {}
673
674 if let Some(result) = result.take() {
676 return Ok(result);
678 }
679 }
680 }
681}
682
683impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>
684 SingleTaskRuntime<O, P>
685{
686 pub fn to_local_runtime(&self) -> LocalAsyncRuntime<O> {
688 LocalAsyncRuntime::new(
689 self.as_raw(),
690 SingleTaskRuntime::<O, P>::get_id_raw,
691 SingleTaskRuntime::<O, P>::spawn_raw,
692 SingleTaskRuntime::<O, P>::spawn_timing_raw,
693 SingleTaskRuntime::<O, P>::timeout_raw,
694 )
695 }
696
697 #[inline]
699 pub(crate) fn as_raw(&self) -> *const () {
700 Arc::into_raw(self.0.clone()) as *const ()
701 }
702
703 #[inline]
705 pub(crate) fn from_raw(raw: *const ()) -> Self {
706 let inner = unsafe {
707 Arc::from_raw(
708 raw as *const (
709 usize,
710 Arc<P>,
711 Sender<(usize, AsyncTimingTask<P, O>)>,
712 AsyncTaskTimer<P, O>,
713 AtomicUsize,
714 AtomicUsize,
715 ),
716 )
717 };
718 SingleTaskRuntime(inner)
719 }
720
721 pub(crate) fn get_id_raw(raw: *const ()) -> usize {
723 let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
724 let id = rt.get_id();
725 Arc::into_raw(rt.0); id
727 }
728
729 pub(crate) fn spawn_raw(raw: *const (), future: LocalBoxFuture<'static, O>) -> Result<()> {
731 let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
732 let result = rt.spawn_by_id(rt.alloc::<O>(), future);
733 Arc::into_raw(rt.0); result
735 }
736
737 pub(crate) fn spawn_timing_raw(
739 raw: *const (),
740 future: LocalBoxFuture<'static, O>,
741 timeout: usize,
742 ) -> Result<()> {
743 let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
744 let result = rt.spawn_timing_by_id(rt.alloc::<O>(), future, timeout);
745 Arc::into_raw(rt.0); result
747 }
748
749 pub(crate) fn timeout_raw(raw: *const (), timeout: usize) -> LocalBoxFuture<'static, ()> {
751 let rt = SingleTaskRuntime::<O, P>::from_raw(raw);
752 let boxed = rt.timeout(timeout);
753 Arc::into_raw(rt.0); boxed
755 }
756}
757
758pub struct SingleTaskRunner<
762 O: Default + 'static,
763 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>,
764> {
765 is_running: AtomicBool, runtime: SingleTaskRuntime<O, P>, clock: Clock, }
769
770unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Send
771 for SingleTaskRunner<O, P>
772{
773}
774unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Sync
775 for SingleTaskRunner<O, P>
776{
777}
778
779impl<O: Default + 'static> Default for SingleTaskRunner<O> {
780 fn default() -> Self {
781 SingleTaskRunner::new(SingleTaskPool::default())
782 }
783}
784
785impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>
786 SingleTaskRunner<O, P>
787{
788 pub fn new(pool: P) -> Self {
790 let rt_uid = pool.get_thread_id() >> 32;
791 let pool = Arc::new(pool);
792
793 let timer = AsyncTaskTimer::new();
795 let producor = timer.get_producor().clone();
796 let timer_producor_count = AtomicUsize::new(0);
797 let timer_consume_count = AtomicUsize::new(0);
798
799 let runtime = SingleTaskRuntime(Arc::new((rt_uid,
801 pool,
802 producor,
803 timer,
804 timer_producor_count,
805 timer_consume_count)));
806
807 SingleTaskRunner {
808 is_running: AtomicBool::new(false),
809 runtime,
810 clock: Clock::new(),
811 }
812 }
813
814 pub fn get_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
816 (self.runtime.0).1.get_thread_waker().cloned()
817 }
818
819 pub fn startup(&self) -> Option<SingleTaskRuntime<O, P>> {
821 if cfg!(target_arch = "aarch64") {
822 match self
823 .is_running
824 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
825 {
826 Ok(false) => {
827 Some(self.runtime.clone())
829 }
830 _ => {
831 None
833 }
834 }
835 } else {
836 match self.is_running.compare_exchange_weak(
837 false,
838 true,
839 Ordering::SeqCst,
840 Ordering::SeqCst,
841 ) {
842 Ok(false) => {
843 Some(self.runtime.clone())
845 }
846 _ => {
847 None
849 }
850 }
851 }
852 }
853
854 pub fn run_once(&self) -> Result<usize> {
856 if !self.is_running.load(Ordering::Relaxed) {
857 return Err(Error::new(
859 ErrorKind::Other,
860 "Single thread runtime not running",
861 ));
862 }
863
864 let mut pop_len = 0;
866 (self.runtime.0)
867 .4
868 .fetch_add((self.runtime.0).3.consume(),
869 Ordering::Relaxed);
870 loop {
871 let current_time = (self.runtime.0).3.is_require_pop();
872 if let Some(current_time) = current_time {
873 let timed_out = (self.runtime.0).3.pop(current_time);
875 if let Some((_handle, timing_task)) = timed_out {
876 match timing_task {
877 AsyncTimingTask::Pended(expired) => {
878 self.runtime.wakeup::<O>(&expired);
880 if let Some(task) = (self.runtime.0).1.try_pop() {
881 run_task(task);
882 }
883 }
884 AsyncTimingTask::WaitRun(expired) => {
885 (self.runtime.0).1.push_priority(DEFAULT_MAX_HIGH_PRIORITY_BOUNDED, expired);
887 if let Some(task) = (self.runtime.0).1.try_pop() {
888 run_task(task);
889 }
890 }
891 }
892 pop_len += 1;
893 }
894 } else {
895 break;
897 }
898 }
899 (self.runtime.0)
900 .5
901 .fetch_add(pop_len,
902 Ordering::Relaxed);
903
904 match (self.runtime.0).1.try_pop() {
906 None => {
907 return Ok(0);
909 }
910 Some(task) => {
911 run_task(task);
912 }
913 }
914
915 Ok((self.runtime.0).1.len())
916 }
917
918 pub fn run(&self) -> Result<usize> {
920 if !self.is_running.load(Ordering::Relaxed) {
921 return Err(Error::new(
923 ErrorKind::Other,
924 "Single thread runtime not running",
925 ));
926 }
927
928 loop {
929 let mut pop_len = 0;
931 let mut start_run_millis = self.clock.recent(); (self.runtime.0)
933 .4
934 .fetch_add((self.runtime.0).3.consume(),
935 Ordering::Relaxed);
936 loop {
937 let current_time = (self.runtime.0).3.is_require_pop();
938 if let Some(current_time) = current_time {
939 let timed_out = (self.runtime.0).3.pop(current_time);
941 if let Some((handle, timing_task)) = timed_out {
942 match timing_task {
943 AsyncTimingTask::Pended(expired) => {
944 self.runtime.wakeup::<O>(&expired);
946 if let Some(task) = (self.runtime.0).1.try_pop() {
947 run_task(task);
948 }
949 }
950 AsyncTimingTask::WaitRun(expired) => {
951 (self.runtime.0).1.push_priority(handle, expired);
953 if let Some(task) = (self.runtime.0).1.try_pop() {
954 run_task(task);
955 }
956 }
957 }
958 pop_len += 1;
959 }
960 } else {
961 break;
963 }
964 }
965 (self.runtime.0)
966 .5
967 .fetch_add(pop_len,
968 Ordering::Relaxed);
969
970 while self
972 .clock
973 .recent()
974 .duration_since(start_run_millis)
975 .as_millis() < 1 {
976 match (self.runtime.0).1.try_pop() {
977 None => {
978 return Ok((self.runtime.0).1.len());
980 }
981 Some(task) => {
982 run_task(task);
983 }
984 }
985 }
986 }
987 }
988
989 pub fn into_local(self) -> SingleTaskRuntime<O, P> {
991 self.runtime
992 }
993}
994
995#[inline]
997fn run_task<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(
998 task: Arc<AsyncTask<P, O>>,
999) {
1000 let waker = waker_ref(&task);
1001 let mut context = Context::from_waker(&*waker);
1002 if let Some(mut future) = task.get_inner() {
1003 if let Poll::Pending = future.as_mut().poll(&mut context) {
1004 task.set_inner(Some(future));
1006 }
1007 }
1008}