1use std::thread;
2use std::any::Any;
3use std::pin::Pin;
4use std::ptr::null_mut;
5use std::vec::IntoIter;
6use std::time::Duration;
7use std::future::Future;
8use std::marker::PhantomData;
9use std::ops::{Deref, DerefMut};
10use std::cell::{RefCell, UnsafeCell};
11use std::task::{Poll, Waker, Context};
12use std::io::{Error, Result, ErrorKind};
13use std::fmt::{Debug, Formatter, Result as FmtResult};
14use std::sync::{Arc, atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering}};
15
16use futures::{future::{FutureExt, LocalBoxFuture},
17 stream::{Stream, StreamExt, LocalBoxStream},
18 task::ArcWake};
19use parking_lot::{Mutex, Condvar};
20use crossbeam_queue::ArrayQueue;
21use crossbeam_channel::{Sender, Receiver, unbounded};
22use flume::{Sender as AsyncSender, Receiver as AsyncReceiver};
23#[cfg(not(target_arch = "wasm32"))]
24use polling::Poller;
25use num_cpus;
26
27use pi_cancel_timer::Timer;
28use slotmap::{Key, KeyData};
29use quanta::{Clock, Instant as QInstant};
30
31use crate::{lock::spin,
32 rt::{PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME, TaskId, AsyncPipelineResult,
33 serial_local_thread::{LocalTaskRunner, LocalTaskRuntime},
34 serial_single_thread::SingleTaskRuntime,
35 serial_worker_thread::{WorkerTaskRunner, WorkerRuntime}}};
36
37pub struct AsyncTask<
41 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
42 O: Default + 'static = (),
43> {
44 uid: TaskId, future: Mutex<Option<LocalBoxFuture<'static, O>>>, pool: Arc<P>, priority: usize, context: Option<UnsafeCell<Box<dyn Any>>>, }
50
51unsafe impl<
52 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
53 O: Default + 'static,
54> Send for AsyncTask<P, O> {}
55unsafe impl<
56 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
57 O: Default + 'static,
58> Sync for AsyncTask<P, O> {}
59
60impl<
61 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
62 O: Default + 'static,
63> ArcWake for AsyncTask<P, O> {
64 #[cfg(not(target_arch = "aarch64"))]
65 fn wake_by_ref(arc_self: &Arc<Self>) {
66 let pool = arc_self.get_pool();
67 let _ = pool.push_keep(arc_self.clone());
68
69 if let Some(waits) = pool.get_waits() {
70 if let Some(worker_waker) = waits.pop() {
72 let (is_sleep, lock, condvar) = &*worker_waker;
74 let locked = lock.lock();
75 if is_sleep.load(Ordering::Relaxed) {
76 if let Ok(true) = is_sleep
78 .compare_exchange_weak(true,
79 false,
80 Ordering::SeqCst,
81 Ordering::SeqCst) {
82 condvar.notify_one();
84 }
85 }
86 }
87 } else {
88 if let Some(thread_waker) = pool.get_thread_waker() {
90 if thread_waker.0.load(Ordering::Relaxed) {
92 let (is_sleep, lock, condvar) = &**thread_waker;
93 let locked = lock.lock();
94 if let Ok(true) = is_sleep
96 .compare_exchange_weak(true,
97 false,
98 Ordering::SeqCst,
99 Ordering::SeqCst) {
100 condvar.notify_one();
102 }
103 }
104 }
105 }
106 }
107 #[cfg(target_arch = "aarch64")]
108 fn wake_by_ref(arc_self: &Arc<Self>) {
109 let pool = arc_self.get_pool();
110 let _ = pool.push_keep(arc_self.clone());
111
112 if let Some(waits) = pool.get_waits() {
113 if let Some(worker_waker) = waits.pop() {
115 let (is_sleep, lock, condvar) = &*worker_waker;
117 let locked = lock.lock();
118 if is_sleep.load(Ordering::Relaxed) {
119 if let Ok(true) = is_sleep
121 .compare_exchange(true,
122 false,
123 Ordering::SeqCst,
124 Ordering::SeqCst) {
125 condvar.notify_one();
127 }
128 }
129 }
130 } else {
131 if let Some(thread_waker) = pool.get_thread_waker() {
133 if thread_waker.0.load(Ordering::Relaxed) {
135 let (is_sleep, lock, condvar) = &**thread_waker;
136 let locked = lock.lock();
137 if let Ok(true) = is_sleep
139 .compare_exchange(true,
140 false,
141 Ordering::SeqCst,
142 Ordering::SeqCst) {
143 condvar.notify_one();
145 }
146 }
147 }
148 }
149 }
150}
151
152impl<
153 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
154 O: Default + 'static,
155> AsyncTask<P, O> {
156 pub fn new(uid: TaskId,
158 pool: Arc<P>,
159 priority: usize,
160 future: Option<LocalBoxFuture<'static, O>>) -> AsyncTask<P, O> {
161 AsyncTask {
162 uid,
163 future: Mutex::new(future),
164 pool,
165 priority,
166 context: None,
167 }
168 }
169
170 pub fn with_context<C: 'static>(uid: TaskId,
172 pool: Arc<P>,
173 priority: usize,
174 future: Option<LocalBoxFuture<'static, O>>,
175 context: C) -> AsyncTask<P, O> {
176 let any = Box::new(context);
177
178 AsyncTask {
179 uid,
180 future: Mutex::new(future),
181 pool,
182 priority,
183 context: Some(UnsafeCell::new(any)),
184 }
185 }
186
187 pub fn with_runtime_and_context<RT, C>(runtime: &RT,
189 priority: usize,
190 future: Option<LocalBoxFuture<'static, O>>,
191 context: C) -> AsyncTask<P, O>
192 where RT: AsyncRuntime<O, Pool = P>,
193 C: 'static {
194 let any = Box::new(context);
195
196 AsyncTask {
197 uid: runtime.alloc::<O>(),
198 future: Mutex::new(future),
199 pool: runtime.shared_pool(),
200 priority,
201 context: Some(UnsafeCell::new(any)),
202 }
203 }
204
205 pub fn is_enable_wakeup(&self) -> bool {
207 self.uid.exist_waker::<O>()
208 }
209
210 pub fn get_inner(&self) -> Option<LocalBoxFuture<'static, O>> {
212 self.future.lock().take()
213 }
214
215 pub fn set_inner(&self, inner: Option<LocalBoxFuture<'static, O>>) {
217 *self.future.lock() = inner;
218 }
219
220 #[inline]
222 pub fn owner(&self) -> usize {
223 unsafe {
224 *self.uid.0.get() as usize
225 }
226 }
227
228 pub fn priority(&self) -> usize {
230 self.priority
231 }
232
233 pub fn exist_context(&self) -> bool {
235 self.context.is_some()
236 }
237
238 pub fn get_context<C: 'static>(&self) -> Option<&C> {
240 if let Some(context) = &self.context {
241 let any = unsafe { &*context.get() };
243 return <dyn Any>::downcast_ref::<C>(&**any);
244 }
245
246 None
247 }
248
249 pub fn get_context_mut<C: 'static>(&self) -> Option<&mut C> {
251 if let Some(context) = &self.context {
252 let any = unsafe { &mut *context.get() };
254 return <dyn Any>::downcast_mut::<C>(&mut **any);
255 }
256
257 None
258 }
259
260 pub fn set_context<C: 'static>(&self, new: C) {
262 if let Some(context) = &self.context {
263 let _ = unsafe { &*context.get() };
265
266 let any: Box<dyn Any + 'static> = Box::new(new);
268 unsafe { *context.get() = any; }
269 }
270 }
271
272 pub fn get_pool(&self) -> &P {
274 self.pool.as_ref()
275 }
276}
277
278pub trait AsyncTaskPool<O: Default + 'static = ()>: Default + 'static {
282 type Pool: AsyncTaskPoolExt<O> + AsyncTaskPool<O>;
283
284 fn get_thread_id(&self) -> usize;
286
287 fn len(&self) -> usize;
289
290 fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
292
293 fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
295
296 fn push_priority(&self, priority: usize, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
298
299 fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
301
302 fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>>;
304
305 fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>>;
307
308 fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
310 None
311 }
312}
313
314pub trait AsyncTaskPoolExt<O: Default + 'static = ()>: 'static {
318 fn set_waits(&mut self,
320 _waits: Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>) {}
321
322 fn get_waits(&self) -> Option<&Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>> {
324 None
326 }
327
328 fn idler_len(&self) -> usize {
330 0
332 }
333
334 fn spawn_worker(&self) -> Option<usize> {
336 None
338 }
339
340 fn worker_len(&self) -> usize {
342 #[cfg(not(target_arch = "wasm32"))]
344 return num_cpus::get();
345 #[cfg(target_arch = "wasm32")]
346 return 1;
347 }
348
349 fn buffer_len(&self) -> usize {
351 0
353 }
354
355 fn set_thread_waker(&mut self, _thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) {
357 }
359
360 fn clone_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
362 None
364 }
365
366 fn close_worker(&self) {
368 }
370}
371
372pub trait AsyncRuntime<O: Default + 'static = ()>: Clone + Send + Sync + 'static {
376 type Pool: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = Self::Pool>;
377
378 fn shared_pool(&self) -> Arc<Self::Pool>;
380
381 fn get_id(&self) -> usize;
383
384 fn wait_len(&self) -> usize;
386
387 fn len(&self) -> usize;
389
390 fn alloc<R: 'static>(&self) -> TaskId;
392
393 fn spawn<F>(&self, future: F) -> Result<TaskId>
395 where F: Future<Output = O> + 'static;
396
397 fn spawn_local<F>(&self, future: F) -> Result<TaskId>
399 where F: Future<Output = O> + 'static;
400
401 fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
403 where F: Future<Output = O> + 'static;
404
405 fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
407 where F: Future<Output = O> + 'static;
408
409 fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
411 where F: Future<Output = O> + 'static;
412
413 fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
415 where F: Future<Output = O> + 'static;
416
417 fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
419 where F: Future<Output = O> + 'static;
420
421 fn spawn_priority_by_id<F>(&self,
423 task_id: TaskId,
424 priority: usize,
425 future: F) -> Result<()>
426 where F: Future<Output = O> + 'static;
427
428 fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
430 where F: Future<Output = O> + 'static;
431
432 fn spawn_timing_by_id<F>(&self,
434 task_id: TaskId,
435 future: F,
436 time: usize) -> Result<()>
437 where F: Future<Output = O> + 'static;
438
439 fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output>;
441
442 fn wakeup<Output: 'static>(&self, task_id: &TaskId);
444
445 fn wait<V: 'static>(&self) -> AsyncWait<V>;
447
448 fn wait_any<V: 'static>(&self, capacity: usize) -> AsyncWaitAny<V>;
450
451 fn wait_any_callback<V: 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V>;
453
454 fn map_reduce<V: 'static>(&self, capacity: usize) -> AsyncMapReduce<V>;
456
457 fn timeout(&self, timeout: usize) -> LocalBoxFuture<'static, ()>;
459
460 fn yield_now(&self) -> LocalBoxFuture<'static, ()>;
462
463 fn pipeline<S, SO, F, FO>(&self, input: S, filter: F) -> LocalBoxStream<'static, FO>
465 where S: Stream<Item = SO> + 'static,
466 SO: 'static,
467 F: FnMut(SO) -> AsyncPipelineResult<FO> + 'static,
468 FO: 'static;
469
470 fn close(&self) -> bool;
472}
473
474pub trait AsyncRuntimeExt<O: Default + 'static = ()> {
478 fn spawn_with_context<F, C>(&self,
480 task_id: TaskId,
481 future: F,
482 context: C) -> Result<()>
483 where F: Future<Output = O> + 'static,
484 C: 'static;
485
486 fn spawn_timing_with_context<F, C>(&self,
488 task_id: TaskId,
489 future: F,
490 context: C,
491 time: usize) -> Result<()>
492 where F: Future<Output = O> + 'static,
493 C: 'static;
494
495 fn block_on<F>(&self, future: F) -> Result<F::Output>
497 where F: Future + 'static,
498 <F as Future>::Output: Default + 'static;
499}
500
501pub struct AsyncRuntimeBuilder<O: Default + 'static = ()>(PhantomData<O>);
505
506impl<O: Default + 'static> AsyncRuntimeBuilder<O> {
507 pub fn default_local_thread(name: Option<&str>,
509 stack_size: Option<usize>) -> LocalTaskRuntime<O> {
510 let runner = LocalTaskRunner::new();
511
512 let thread_name = if let Some(name) = name {
513 name
514 } else {
515 "Default-Local-RT"
517 };
518 let thread_stack_size = if let Some(size) = stack_size {
519 size
520 } else {
521 2 * 1024 * 1024
523 };
524
525 runner.startup(thread_name, thread_stack_size)
526 }
527
528 pub fn default_worker_thread(worker_name: Option<&str>,
530 worker_stack_size: Option<usize>,
531 worker_sleep_timeout: Option<u64>,
532 worker_loop_interval: Option<Option<u64>>) -> WorkerRuntime<O> {
533 let runner = WorkerTaskRunner::default();
534
535 let thread_name = if let Some(name) = worker_name {
536 name
537 } else {
538 "Default-Single-Worker"
540 };
541 let thread_stack_size = if let Some(size) = worker_stack_size {
542 size
543 } else {
544 2 * 1024 * 1024
546 };
547 let sleep_timeout = if let Some(timeout) = worker_sleep_timeout {
548 timeout
549 } else {
550 1
552 };
553 let loop_interval = if let Some(interval) = worker_loop_interval {
554 interval
555 } else {
556 None
558 };
559
560 let clock = Clock::new();
562 let runner_copy = runner.clone();
563 let rt_copy = runner.get_runtime();
564 let rt = runner.startup(
565 thread_name,
566 thread_stack_size,
567 sleep_timeout,
568 loop_interval,
569 move || {
570 let now = clock.recent();
571 match runner_copy.run_once() {
572 Err(e) => {
573 panic!("Run runner failed, reason: {:?}", e);
574 },
575 Ok(len) => {
576 (len == 0,
577 clock
578 .recent()
579 .duration_since(now))
580 },
581 }
582 },
583 move || {
584 rt_copy.wait_len() + rt_copy.len()
585 },
586 );
587
588 rt
589 }
590
591 #[cfg(not(target_arch = "wasm32"))]
593 pub fn custom_local_thread(name: Option<&str>,
594 stack_size: Option<usize>,
595 poller: Option<Arc<Poller>>,
596 try_count: Option<usize>,
597 timeout: Option<Duration>,) -> LocalTaskRuntime<O> {
598 let poller = if let Some(poller) = poller {
599 poller
600 } else {
601 Arc::new(Poller::new().expect("Failed to create poller"))
602 };
603 let runner = LocalTaskRunner::with_poll(poller);
604
605 let thread_name = if let Some(name) = name {
606 name
607 } else {
608 "Custom-Local-RT"
610 };
611 let thread_stack_size = if let Some(size) = stack_size {
612 size
613 } else {
614 2 * 1024 * 1024
616 };
617 let try_count = try_count.unwrap_or(3);
618
619 runner.startup_with_poll(
620 thread_name,
621 thread_stack_size,
622 try_count,
623 timeout
624 )
625 }
626
627 pub fn custom_worker_thread<P, F0, F1>(pool: P,
629 worker_handle: Arc<AtomicBool>,
630 worker_condvar: Arc<(AtomicBool, Mutex<()>, Condvar)>,
631 thread_name: &str,
632 thread_stack_size: usize,
633 sleep_timeout: u64,
634 loop_interval: Option<u64>,
635 loop_func: F0,
636 get_queue_len: F1) -> WorkerRuntime<O, P>
637 where P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
638 F0: Fn() -> (bool, Duration) + Send + 'static,
639 F1: Fn() -> usize + Send + 'static {
640 let runner = WorkerTaskRunner::new(pool,
641 worker_handle,
642 worker_condvar);
643
644 let rt_copy = runner.get_runtime();
646 let rt = runner.startup(
647 thread_name,
648 thread_stack_size,
649 sleep_timeout,
650 loop_interval,
651 loop_func,
652 move || {
653 rt_copy.wait_len() + get_queue_len()
654 },
655 );
656
657 rt
658 }
659}
660
661pub fn bind_local_thread<O: Default + 'static>(runtime: LocalAsyncRuntime<O>) {
663 match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
664 let raw = Arc::into_raw(Arc::new(runtime)) as *mut LocalAsyncRuntime<O> as *mut ();
665 rt.store(raw, Ordering::Relaxed);
666 }) {
667 Err(e) => {
668 panic!("Bind single runtime to local thread failed, reason: {:?}", e);
669 },
670 Ok(_) => (),
671 }
672}
673
674pub fn unbind_local_thread() {
676 let _ = PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
677 rt.store(null_mut(), Ordering::Relaxed);
678 });
679}
680
681pub struct LocalAsyncRuntime<O: Default + 'static> {
685 inner: *const (), get_id_func: fn(*const ()) -> usize, spawn_func: fn(*const (), LocalBoxFuture<'static, O>) -> Result<()>, spawn_timing_func: fn(*const (), LocalBoxFuture<'static, O>, usize) -> Result<()>, timeout_func: fn(*const (), usize) -> LocalBoxFuture<'static, ()>, }
691
692unsafe impl<O: Default + 'static> Send for LocalAsyncRuntime<O> {}
693unsafe impl<O: Default + 'static> Sync for LocalAsyncRuntime<O> {}
694
695impl<O: Default + 'static> LocalAsyncRuntime<O> {
696 pub fn new(inner: *const (),
698 get_id_func: fn(*const ()) -> usize,
699 spawn_func: fn(*const (), LocalBoxFuture<'static, O>) -> Result<()>,
700 spawn_timing_func: fn(*const (), LocalBoxFuture<'static, O>, usize) -> Result<()>,
701 timeout_func: fn(*const (), usize) -> LocalBoxFuture<'static, ()>) -> Self {
702 LocalAsyncRuntime {
703 inner,
704 get_id_func,
705 spawn_func,
706 spawn_timing_func,
707 timeout_func,
708 }
709 }
710
711 #[inline]
713 pub fn get_id(&self) -> usize {
714 (self.get_id_func)(self.inner)
715 }
716
717 #[inline]
719 pub fn spawn<F>(&self, future: F) -> Result<()>
720 where F: Future<Output = O> + 'static {
721 (self.spawn_func)(self.inner, async move {
722 future.await
723 }.boxed_local())
724 }
725
726 #[inline]
728 pub fn sapwn_timing_func<F>(&self, future: F, timeout: usize) -> Result<()>
729 where F: Future<Output = O> + 'static {
730 (self.spawn_timing_func)(self.inner,
731 async move {
732 future.await
733 }.boxed_local(),
734 timeout)
735 }
736
737 #[inline]
739 pub fn timeout(&self, timeout: usize) -> LocalBoxFuture<'static, ()> {
740 (self.timeout_func)(self.inner, timeout)
741 }
742}
743
744pub fn local_serial_async_runtime<O: Default + 'static>() -> Option<Arc<LocalAsyncRuntime<O>>> {
749 match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |ptr| {
750 let raw = ptr.load(Ordering::Relaxed) as *const LocalAsyncRuntime<O>;
751 unsafe {
752 if raw.is_null() {
753 None
755 } else {
756 let shared: Arc<LocalAsyncRuntime<O>> = unsafe { Arc::from_raw(raw) };
758 let result = shared.clone();
759 Arc::into_raw(shared); Some(result)
761 }
762 }
763 }) {
764 Err(_) => None, Ok(rt) => rt,
766 }
767}
768
769pub fn spawn_local<O, F>(future: F) -> Result<()>
774 where O: Default + 'static,
775 F: Future<Output = O> + 'static {
776 if let Some(rt) = local_serial_async_runtime::<O>() {
777 rt.spawn(future)
778 } else {
779 Err(Error::new(ErrorKind::Other, format!("Spawn task to local thread failed, reason: runtime not exist")))
780 }
781}
782
783pub fn local_async_runtime<O: Default + 'static>() -> Option<Arc<LocalAsyncRuntime<O>>> {
788 match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |ptr| {
789 let raw = ptr.load(Ordering::Relaxed) as *const LocalAsyncRuntime<O>;
790 unsafe {
791 if raw.is_null() {
792 None
794 } else {
795 let shared: Arc<LocalAsyncRuntime<O>> = unsafe { Arc::from_raw(raw) };
797 let result = shared.clone();
798 Arc::into_raw(shared); Some(result)
800 }
801 }
802 }) {
803 Err(_) => None, Ok(rt) => rt,
805 }
806}
807
808pub struct AsyncValue<V: 'static>(Arc<InnerAsyncValue<V>>);
812
813unsafe impl<V: 'static> Send for AsyncValue<V> {}
814unsafe impl<V: 'static> Sync for AsyncValue<V> {}
815
816impl<V: 'static> Clone for AsyncValue<V> {
817 fn clone(&self) -> Self {
818 AsyncValue(self.0.clone())
819 }
820}
821
822impl<V: Send + 'static> Debug for AsyncValue<V> {
823 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
824 write!(f,
825 "AsyncValue[status = {}]",
826 self.0.status.load(Ordering::Acquire))
827 }
828}
829
830impl<V: 'static> Future for AsyncValue<V> {
831 type Output = V;
832
833 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
834 let mut spin_len = 1;
835 while self.0.status.load(Ordering::Acquire) == 2 {
836 spin_len = spin(spin_len);
838 }
839
840 if self.0.status.load(Ordering::Acquire) == 3 {
841 if let Some(value) = unsafe { (*(&self).0.value.get()).take() } {
842 return Poll::Ready(value);
844 }
845 }
846
847 unsafe {
848 *self.0.waker.get() = Some(cx.waker().clone()); }
850
851 let mut spin_len = 1;
852 loop {
853 match self.0.status.compare_exchange(0,
854 1, Ordering::Acquire,
855 Ordering::Relaxed) {
856 Err(2) => {
857 spin_len = spin(spin_len);
859 continue;
860 },
861 Err(3) => {
862 let value = unsafe { (*(&self).0.value.get()).take().unwrap() };
864 return Poll::Ready(value);
865 },
866 Err(_) => {
867 unimplemented!();
868 },
869 Ok(_) => {
870 return Poll::Pending;
872 },
873 }
874 }
875 }
876}
877
878impl<V: 'static> AsyncValue<V> {
882 pub fn new() -> Self {
884 let inner = InnerAsyncValue {
885 value: UnsafeCell::new(None),
886 waker: UnsafeCell::new(None),
887 status: AtomicU8::new(0),
888 };
889
890 AsyncValue(Arc::new(inner))
891 }
892
893 pub fn is_complete(&self) -> bool {
895 self
896 .0
897 .status
898 .load(Ordering::Relaxed) == 3
899 }
900
901 pub fn set(self, value: V) {
903 loop {
904 match self.0.status.compare_exchange(1,
905 2,
906 Ordering::Acquire,
907 Ordering::Relaxed) {
908 Err(0) => {
909 match self.0.status.compare_exchange(0,
910 2,
911 Ordering::Acquire,
912 Ordering::Relaxed) {
913 Err(1) => {
914 continue;
916 },
917 Err(_) => {
918 return;
920 },
921 Ok(_) => {
922 unsafe { *self.0.value.get() = Some(value); }
924 self.0.status.store(3, Ordering::Release);
925 return;
926 }
927 }
928 },
929 Err(_) => {
930 return;
932 },
933 Ok(_) => {
934 break;
936 }
937 }
938 }
939
940 unsafe { *self.0.value.get() = Some(value); }
942 self.0.status.store(3, Ordering::Release);
943 let waker = unsafe { (*self.0.waker.get()).take().unwrap() };
944 waker.wake();
945 }
946}
947
948pub struct InnerAsyncValue<V: 'static> {
950 value: UnsafeCell<Option<V>>, waker: UnsafeCell<Option<Waker>>, status: AtomicU8, }
954
955pub struct AsyncVariableGuard<'a, V: 'static> {
959 value: &'a UnsafeCell<Option<V>>, waker: &'a UnsafeCell<Option<Waker>>, status: &'a AtomicU8, }
963
964unsafe impl<V: 'static> Send for AsyncVariableGuard<'_, V> {}
965
966impl<V: 'static> Drop for AsyncVariableGuard<'_, V> {
967 fn drop(&mut self) {
968 self.status.fetch_sub(2, Ordering::Relaxed);
972 }
973}
974
975impl<V: 'static> Deref for AsyncVariableGuard<'_, V> {
976 type Target = Option<V>;
977
978 fn deref(&self) -> &Self::Target {
979 unsafe {
980 &*self.value.get()
981 }
982 }
983}
984
985impl<V: 'static> DerefMut for AsyncVariableGuard<'_, V> {
986 fn deref_mut(&mut self) -> &mut Self::Target {
987 unsafe {
988 &mut *self.value.get()
989 }
990 }
991}
992
993impl<V: 'static> AsyncVariableGuard<'_, V> {
994 pub fn finish(self) {
996 if self.status.fetch_add(4, Ordering::Relaxed) == 3 {
998 if let Some(waker) = unsafe { (&mut *self.waker.get()).take() } {
999 waker.wake();
1001 }
1002 }
1003 }
1004}
1005
1006pub struct AsyncVariable<V: 'static>(Arc<InnerAsyncVariable<V>>);
1010
1011unsafe impl<V: 'static> Send for AsyncVariable<V> {}
1012unsafe impl<V: 'static> Sync for AsyncVariable<V> {}
1013
1014impl<V: 'static> Clone for AsyncVariable<V> {
1015 fn clone(&self) -> Self {
1016 AsyncVariable(self.0.clone())
1017 }
1018}
1019
1020impl<V: 'static> Future for AsyncVariable<V> {
1021 type Output = V;
1022
1023 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1024 unsafe {
1025 *self.0.waker.get() = Some(cx.waker().clone()); }
1027
1028 let mut spin_len = 1;
1029 loop {
1030 match self.0.status.compare_exchange(0,
1031 1,
1032 Ordering::Acquire,
1033 Ordering::Relaxed) {
1034 Err(current) if current & 4 != 0 => {
1035 unsafe {
1037 let _ = (&mut *self.0.waker.get()).take(); return Poll::Ready((&mut *(&self).0.value.get()).take().unwrap());
1039 }
1040 },
1041 Err(_) => {
1042 spin_len = spin(spin_len);
1044 },
1045 Ok(_) => {
1046 return Poll::Pending;
1048 },
1049 }
1050 }
1051 }
1052}
1053
1054impl<V: 'static> AsyncVariable<V> {
1055 pub fn new() -> Self {
1057 let inner = InnerAsyncVariable {
1058 value: UnsafeCell::new(None),
1059 waker: UnsafeCell::new(None),
1060 status: AtomicU8::new(0),
1061 };
1062
1063 AsyncVariable(Arc::new(inner))
1064 }
1065
1066 pub fn is_complete(&self) -> bool {
1068 self
1069 .0
1070 .status
1071 .load(Ordering::Acquire) & 4 != 0
1072 }
1073
1074 pub fn lock(&self) -> Option<AsyncVariableGuard<V>> {
1076 let mut spin_len = 1;
1077 loop {
1078 match self
1079 .0
1080 .status
1081 .compare_exchange(1,
1082 3,
1083 Ordering::Acquire,
1084 Ordering::Relaxed) {
1085 Err(0) => {
1086 match self
1088 .0
1089 .status
1090 .compare_exchange(0,
1091 2,
1092 Ordering::Acquire,
1093 Ordering::Relaxed) {
1094 Err(1) => {
1095 continue;
1097 },
1098 Err(2) => {
1099 spin_len = spin(spin_len);
1101 },
1102 Err(3) => {
1103 spin_len = spin(spin_len);
1105 },
1106 Err(_) => {
1107 return None;
1109 },
1110 Ok(_) => {
1111 let guard = AsyncVariableGuard {
1113 value: &self.0.value,
1114 waker: &self.0.waker,
1115 status: &self.0.status,
1116 };
1117
1118 return Some(guard)
1119 },
1120 }
1121 },
1122 Err(2) => {
1123 spin_len = spin(spin_len);
1125 },
1126 Err(3) => {
1127 spin_len = spin(spin_len);
1129 },
1130 Err(_) => {
1131 return None;
1133 }
1134 Ok(_) => {
1135 let guard = AsyncVariableGuard {
1137 value: &self.0.value,
1138 waker: &self.0.waker,
1139 status: &self.0.status,
1140 };
1141
1142 return Some(guard)
1143 },
1144 }
1145 }
1146 }
1147}
1148
1149pub struct InnerAsyncVariable<V: 'static> {
1151 value: UnsafeCell<Option<V>>, waker: UnsafeCell<Option<Waker>>, status: AtomicU8, }
1155
1156pub struct AsyncWaitResult<V: 'static>(pub Arc<RefCell<Option<Result<V>>>>);
1160
1161unsafe impl<V: 'static> Send for AsyncWaitResult<V> {}
1162unsafe impl<V: 'static> Sync for AsyncWaitResult<V> {}
1163
1164impl<V: 'static> Clone for AsyncWaitResult<V> {
1165 fn clone(&self) -> Self {
1166 AsyncWaitResult(self.0.clone())
1167 }
1168}
1169
1170pub struct AsyncWaitResults<V: 'static>(pub Arc<RefCell<Option<Vec<Result<V>>>>>);
1174
1175unsafe impl<V: 'static> Send for AsyncWaitResults<V> {}
1176unsafe impl<V: 'static> Sync for AsyncWaitResults<V> {}
1177
1178impl<V: 'static> Clone for AsyncWaitResults<V> {
1179 fn clone(&self) -> Self {
1180 AsyncWaitResults(self.0.clone())
1181 }
1182}
1183
1184pub enum AsyncTimingTask<
1188 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1189 O: Default + 'static = (),
1190> {
1191 Pended(TaskId), WaitRun(Arc<AsyncTask<P, O>>), }
1194
1195pub struct AsyncTaskTimer<
1199 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1200 O: Default + 'static = (),
1201> {
1202 producor: Sender<(usize, AsyncTimingTask<P, O>)>, consumer: Receiver<(usize, AsyncTimingTask<P, O>)>, timer: Arc<RefCell<Timer<AsyncTimingTask<P, O>, 1000, 60, 3>>>, clock: Clock, now: QInstant, }
1208
1209unsafe impl<
1210 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1211 O: Default + 'static,
1212> Send for AsyncTaskTimer<P, O> {}
1213unsafe impl<
1214 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1215 O: Default + 'static,
1216> Sync for AsyncTaskTimer<P, O> {}
1217
1218impl<
1219 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1220 O: Default + 'static,
1221> AsyncTaskTimer<P, O> {
1222 pub fn new() -> Self {
1224 let (producor, consumer) = unbounded();
1225 let clock = Clock::new();
1226 let now = clock.recent();
1227
1228 AsyncTaskTimer {
1229 producor,
1230 consumer,
1231 timer: Arc::new(RefCell::new(Timer::<AsyncTimingTask<P, O>, 1000, 60, 3>::default())),
1232 clock,
1233 now,
1234 }
1235 }
1236
1237 #[inline]
1239 pub fn get_producor(&self) -> &Sender<(usize, AsyncTimingTask<P, O>)> {
1240 &self.producor
1241 }
1242
1243 #[inline]
1245 pub fn len(&self) -> usize {
1246 let timer = self.timer.as_ref().borrow();
1247 timer.add_count() - timer.remove_count()
1248 }
1249
1250 pub fn set_timer(&self, task: AsyncTimingTask<P, O>, timeout: usize) -> usize {
1252 let current_time = self
1253 .clock
1254 .recent()
1255 .duration_since(self.now)
1256 .as_millis() as u64;
1257 self
1258 .timer
1259 .borrow_mut()
1260 .push_time(current_time + timeout as u64, task)
1261 .data()
1262 .as_ffi() as usize
1263 }
1264
1265 pub fn cancel_timer(&self, timer_ref: usize) -> Option<AsyncTimingTask<P, O>> {
1267 if let Some(item) =self
1268 .timer
1269 .borrow_mut()
1270 .cancel(KeyData::from_ffi(timer_ref as u64).into()) {
1271 Some(item)
1272 } else {
1273 None
1274 }
1275 }
1276
1277 pub fn consume(&self) -> usize {
1279 let mut len = 0;
1280 let timer_tasks = self.consumer.try_iter().collect::<Vec<(usize, AsyncTimingTask<P, O>)>>();
1281 for (timeout, task) in timer_tasks {
1282 self.set_timer(task, timeout);
1283 len += 1;
1284 }
1285
1286 len
1287 }
1288
1289 pub fn is_require_pop(&self) -> Option<u64> {
1291 let current_time = self
1292 .clock
1293 .recent()
1294 .duration_since(self.now)
1295 .as_millis() as u64;
1296 if self.timer.borrow_mut().is_ok(current_time) {
1297 Some(current_time)
1298 } else {
1299 None
1300 }
1301 }
1302
1303 pub fn pop(&self, current_time: u64) -> Option<(usize, AsyncTimingTask<P, O>)> {
1305 if let Some((key, item)) = self.timer.borrow_mut().pop_kv(current_time) {
1306 Some((key.data().as_ffi() as usize, item))
1307 } else {
1308 None
1309 }
1310 }
1311}
1312
1313pub struct AsyncWaitTimeout<
1317 RT: AsyncRuntime<O>,
1318 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1319 O: Default + 'static = (),
1320> {
1321 rt: RT, producor: Sender<(usize, AsyncTimingTask<P, O>)>, timeout: usize, expired: AtomicBool, }
1326
1327unsafe impl<
1328 RT: AsyncRuntime<O>,
1329 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1330 O: Default + 'static,
1331> Send for AsyncWaitTimeout<RT, P, O> {}
1332unsafe impl<
1333 RT: AsyncRuntime<O>,
1334 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1335 O: Default + 'static,
1336> Sync for AsyncWaitTimeout<RT, P, O> {}
1337
1338impl<
1339 RT: AsyncRuntime<O>,
1340 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1341 O: Default + 'static,
1342> Future for AsyncWaitTimeout<RT, P, O> {
1343 type Output = ();
1344
1345 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1346 if (&self).expired.load(Ordering::Relaxed) {
1347 return Poll::Ready(());
1349 } else {
1350 (&self).expired.store(true, Ordering::Relaxed);
1352 }
1353
1354 let task_id = self.rt.alloc::<O>();
1355 let reply = self.rt.pending(&task_id, cx.waker().clone());
1356
1357 (&self).producor.send(((&self).timeout, AsyncTimingTask::Pended(task_id)));
1359 reply
1360 }
1361}
1362
1363impl<
1364 RT: AsyncRuntime<O>,
1365 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1366 O: Default + 'static,
1367> AsyncWaitTimeout<RT, P, O> {
1368 pub fn new(rt: RT,
1370 producor: Sender<(usize, AsyncTimingTask<P, O>)>,
1371 timeout: usize) -> Self {
1372 AsyncWaitTimeout {
1373 rt,
1374 producor,
1375 timeout,
1376 expired: AtomicBool::new(false), }
1378 }
1379}
1380
1381pub struct AsyncWait<V: 'static>(AsyncWaitAny<V>);
1385
1386unsafe impl<V: 'static> Send for AsyncWait<V> {}
1387unsafe impl<V: 'static> Sync for AsyncWait<V> {}
1388
1389impl<V: 'static> AsyncWait<V> {
1393 pub(crate) fn new(inner: AsyncWaitAny<V>) -> Self {
1395 AsyncWait(inner)
1396 }
1397
1398 pub fn spawn<RT, O, F>(&self,
1400 rt: RT,
1401 timeout: Option<usize>,
1402 future: F) -> Result<()>
1403 where RT: AsyncRuntime<O>,
1404 O: Default + 'static,
1405 F: Future<Output = Result<V>> + 'static {
1406 self.0.spawn(rt.clone(), future)?;
1407
1408 if let Some(timeout) = timeout {
1409 let rt_copy = rt.clone();
1411 self.0.spawn(rt, async move {
1412 rt_copy.timeout(timeout).await;
1413
1414 Err(Error::new(ErrorKind::TimedOut, format!("Time out")))
1416 })
1417 } else {
1418 Ok(())
1420 }
1421 }
1422
1423 pub fn spawn_local<O, F>(&self,
1425 timeout: Option<usize>,
1426 future: F) -> Result<()>
1427 where O: Default + 'static,
1428 F: Future<Output = Result<V>> + 'static {
1429 if let Some(rt) = local_serial_async_runtime::<O>() {
1430 self.0.spawn_local(future)?;
1432
1433 if let Some(timeout) = timeout {
1434 let rt_copy = rt.clone();
1436 self.0.spawn_local(async move {
1437 rt_copy.timeout(timeout).await;
1438
1439 Err(Error::new(ErrorKind::TimedOut, format!("Time out")))
1441 })
1442 } else {
1443 Ok(())
1445 }
1446 } else {
1447 Err(Error::new(ErrorKind::Other, format!("Spawn wait task failed, reason: local async runtime not exist")))
1449 }
1450 }
1451}
1452
1453impl<V: 'static> AsyncWait<V> {
1457 pub async fn wait_result(self) -> Result<V> {
1459 self.0.wait_result().await
1460 }
1461}
1462
1463pub struct AsyncWaitAny<V: 'static> {
1467 capacity: usize, producor: AsyncSender<Result<V>>, consumer: AsyncReceiver<Result<V>>, }
1471
1472unsafe impl<V: 'static> Send for AsyncWaitAny<V> {}
1473unsafe impl<V: 'static> Sync for AsyncWaitAny<V> {}
1474
1475impl<V: 'static> AsyncWaitAny<V> {
1479 pub(crate) fn new(capacity: usize,
1481 producor: AsyncSender<Result<V>>,
1482 consumer: AsyncReceiver<Result<V>>) -> Self {
1483 AsyncWaitAny {
1484 capacity,
1485 producor,
1486 consumer,
1487 }
1488 }
1489
1490 pub fn spawn<RT, O, F>(&self,
1492 rt: RT,
1493 future: F) -> Result<()>
1494 where RT: AsyncRuntime<O>,
1495 O: Default + 'static,
1496 F: Future<Output = Result<V>> + 'static {
1497 let producor = self.producor.clone();
1498 rt.spawn_by_id(rt.alloc::<O>(), async move {
1499 let value = future.await;
1500 producor.into_send_async(value).await;
1501
1502 Default::default()
1504 })
1505 }
1506
1507 pub fn spawn_local<F>(&self,
1509 future: F) -> Result<()>
1510 where F: Future<Output = Result<V>> + 'static {
1511 if let Some(rt) = local_serial_async_runtime() {
1512 let producor = self.producor.clone();
1514 rt.spawn(async move {
1515 let value = future.await;
1516 producor.into_send_async(value).await;
1517 })
1518 } else {
1519 Err(Error::new(ErrorKind::Other, format!("Spawn wait any task failed, reason: local async runtime not exist")))
1521 }
1522 }
1523}
1524
1525impl<V: 'static> AsyncWaitAny<V> {
1529 pub async fn wait_result(self) -> Result<V> {
1531 match self.consumer.recv_async().await {
1532 Err(e) => {
1533 Err(Error::new(ErrorKind::Other, format!("Wait any result failed, reason: {:?}", e)))
1535 },
1536 Ok(result) => {
1537 result
1539 },
1540 }
1541 }
1542}
1543
1544pub struct AsyncWaitAnyCallback<V: 'static> {
1548 capacity: usize, producor: AsyncSender<Result<V>>, consumer: AsyncReceiver<Result<V>>, }
1552
1553unsafe impl<V: 'static> Send for AsyncWaitAnyCallback<V> {}
1554unsafe impl<V: 'static> Sync for AsyncWaitAnyCallback<V> {}
1555
1556impl<V: 'static> AsyncWaitAnyCallback<V> {
1560 pub(crate) fn new(capacity: usize,
1562 producor: AsyncSender<Result<V>>,
1563 consumer: AsyncReceiver<Result<V>>) -> Self {
1564 AsyncWaitAnyCallback {
1565 capacity,
1566 producor,
1567 consumer,
1568 }
1569 }
1570
1571 pub fn spawn<RT, O, F>(&self,
1573 rt: RT,
1574 future: F) -> Result<()>
1575 where RT: AsyncRuntime<O>,
1576 O: Default + 'static,
1577 F: Future<Output = Result<V>> + 'static {
1578 let producor = self.producor.clone();
1579 rt.spawn_by_id(rt.alloc::<O>(), async move {
1580 let value = future.await;
1581 producor.into_send_async(value).await;
1582
1583 Default::default()
1585 })
1586 }
1587
1588 pub fn spawn_local<F>(&self,
1590 future: F) -> Result<()>
1591 where F: Future<Output = Result<V>> + 'static {
1592 if let Some(rt) = local_serial_async_runtime() {
1593 let producor = self.producor.clone();
1595 rt.spawn(async move {
1596 let value = future.await;
1597 producor.into_send_async(value).await;
1598 })
1599 } else {
1600 Err(Error::new(ErrorKind::Other, format!("Spawn wait any task failed by callback, reason: current async runtime not exist")))
1602 }
1603 }
1604}
1605
1606impl<V: 'static> AsyncWaitAnyCallback<V> {
1610 pub async fn wait_result(mut self,
1612 callback: impl Fn(&Result<V>) -> bool + 'static) -> Result<V> {
1613 let checker = create_checker(self.capacity, callback);
1614 loop {
1615 match self.consumer.recv_async().await {
1616 Err(e) => {
1617 return Err(Error::new(ErrorKind::Other, format!("Wait any result failed by callback, reason: {:?}", e)));
1619 },
1620 Ok(result) => {
1621 if checker(&result) {
1623 return result;
1625 }
1626 },
1627 }
1628 }
1629 }
1630}
1631
1632fn create_checker<V, F>(len: usize,
1634 callback: F) -> Arc<dyn Fn(&Result<V>) -> bool + 'static>
1635 where V: 'static,
1636 F: Fn(&Result<V>) -> bool + 'static {
1637 let mut check_counter = AtomicUsize::new(len); Arc::new(move |result| {
1639 if check_counter.fetch_sub(1, Ordering::SeqCst) == 1 {
1640 true
1642 } else {
1643 callback(result)
1645 }
1646 })
1647}
1648
1649pub struct AsyncMapReduce<V: 'static> {
1653 count: usize, capacity: usize, producor: AsyncSender<(usize, Result<V>)>, consumer: AsyncReceiver<(usize, Result<V>)>, }
1658
1659unsafe impl<V: 'static> Send for AsyncMapReduce<V> {}
1660
1661impl<V: 'static> AsyncMapReduce<V> {
1665 pub(crate) fn new(count: usize,
1667 capacity: usize,
1668 producor: AsyncSender<(usize, Result<V>)>,
1669 consumer: AsyncReceiver<(usize, Result<V>)>) -> Self {
1670 AsyncMapReduce {
1671 count,
1672 capacity,
1673 producor,
1674 consumer,
1675 }
1676 }
1677
1678 pub fn map<RT, O, F>(&mut self, rt: RT, future: F) -> Result<usize>
1680 where RT: AsyncRuntime<O>,
1681 O: Default + 'static,
1682 F: Future<Output = Result<V>> + 'static {
1683 if self.count >= self.capacity {
1684 return Err(Error::new(ErrorKind::Other, format!("Map task to runtime failed, capacity: {}, reason: out of capacity", self.capacity)));
1686 }
1687
1688 let index = self.count;
1689 let producor = self.producor.clone();
1690 rt.spawn(async move {
1691 let value = future.await;
1692 producor.into_send_async((index, value)).await;
1693
1694 Default::default()
1696 })?;
1697
1698 self.count += 1; Ok(index)
1700 }
1701}
1702
1703impl<V: 'static> AsyncMapReduce<V> {
1707 pub async fn reduce(self, order: bool) -> Result<Vec<Result<V>>> {
1709 let mut count = self.count;
1710 let mut results = Vec::with_capacity(count);
1711 while count > 0 {
1712 match self.consumer.recv_async().await {
1713 Err(e) => {
1714 return Err(Error::new(ErrorKind::Other, format!("Reduce result failed, reason: {:?}", e)));
1716 },
1717 Ok((index, result)) => {
1718 results.push((index, result));
1720 count -= 1;
1721 },
1722 }
1723 }
1724
1725 if order {
1726 results.sort_by_key(|(key, _value)| {
1728 key.clone()
1729 });
1730 }
1731 let (_, values) = results
1732 .into_iter()
1733 .unzip::<usize, Result<V>, Vec<usize>, Vec<Result<V>>>();
1734
1735 Ok(values)
1736 }
1737}
1738
1739pub fn spawn_worker_thread<F0, F1>(thread_name: &str,
1745 thread_stack_size: usize,
1746 thread_handler: Arc<AtomicBool>,
1747 thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>, sleep_timeout: u64, loop_interval: Option<u64>, loop_func: F0,
1751 get_queue_len: F1) -> Arc<AtomicBool>
1752 where F0: Fn() -> (bool, Duration) + Send + 'static,
1753 F1: Fn() -> usize + Send + 'static {
1754 let thread_status_copy = thread_handler.clone();
1755
1756 thread::Builder::new()
1757 .name(thread_name.to_string())
1758 .stack_size(thread_stack_size)
1759 .spawn(move || {
1760 let mut sleep_count = 0;
1761
1762 while thread_handler.load(Ordering::Relaxed) {
1763 let (is_no_task, run_time) = loop_func();
1764
1765 if is_no_task {
1766 if sleep_count > 1 {
1768 sleep_count = 0; let (is_sleep, lock, condvar) = &*thread_waker;
1771 let mut locked = lock.lock();
1772 if get_queue_len() > 0 {
1773 continue;
1775 }
1776
1777 if !is_sleep.load(Ordering::Relaxed) {
1778 is_sleep.store(true, Ordering::SeqCst);
1780 if condvar
1781 .wait_for(
1782 &mut locked,
1783 Duration::from_millis(sleep_timeout),
1784 )
1785 .timed_out()
1786 {
1787 is_sleep.store(false, Ordering::SeqCst);
1789 }
1790 }
1791
1792 continue; }
1794
1795 sleep_count += 1; if let Some(interval) = &loop_interval {
1797 if let Some(remaining_interval) = Duration::from_millis(*interval).checked_sub(run_time){
1799 thread::sleep(remaining_interval);
1801 }
1802 }
1803 } else {
1804 sleep_count = 0; if let Some(interval) = &loop_interval {
1807 if let Some(remaining_interval) = Duration::from_millis(*interval).checked_sub(run_time){
1809 thread::sleep(remaining_interval);
1811 }
1812 }
1813 }
1814 }
1815 });
1816
1817 thread_status_copy
1818}
1819
1820pub fn wakeup_worker_thread<O, P>(worker_waker: &Arc<(AtomicBool, Mutex<()>, Condvar)>,
1822 rt: &SingleTaskRuntime<O, P>)
1823 where O: Default + 'static,
1824 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P> {
1825 if worker_waker.0.load(Ordering::Relaxed) && rt.len() > 0 {
1827 let (is_sleep, lock, condvar) = &**worker_waker;
1828 let locked = lock.lock();
1829 is_sleep.store(false, Ordering::SeqCst); let _ = condvar.notify_one();
1831 }
1832}