1use std::thread;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::ptr::null_mut;
8use std::vec::IntoIter;
9use std::future::Future;
10use std::panic::set_hook;
11use std::any::{Any, TypeId};
12use std::marker::PhantomData;
13use std::ops::{Deref, DerefMut};
14use std::cell::{RefCell, UnsafeCell};
15use std::task::{Waker, Context, Poll};
16use std::time::{Duration, SystemTime};
17use std::io::{Error, Result, ErrorKind};
18use std::alloc::{Layout, set_alloc_error_hook};
19use std::fmt::{Debug, Formatter, Result as FmtResult};
20use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, AtomicPtr, Ordering};
21
22pub mod single_thread;
23pub mod multi_thread;
24pub mod worker_thread;
25pub mod serial;
26pub mod serial_local_thread;
27pub mod serial_single_thread;
28pub mod serial_worker_thread;
29pub mod serial_local_compatible_wasm_runtime;
30
31use libc;
32use futures::{future::{FutureExt, BoxFuture},
33 stream::{Stream, BoxStream},
34 task::ArcWake};
35use parking_lot::{Mutex, Condvar};
36use crossbeam_channel::{Sender, Receiver, unbounded};
37use crossbeam_queue::ArrayQueue;
38use crossbeam_utils::atomic::AtomicCell;
39use flume::{Sender as AsyncSender, Receiver as AsyncReceiver};
40use num_cpus;
41use backtrace::Backtrace;
42use slotmap::{Key, KeyData};
43use quanta::{Clock, Upkeep, Handle, Instant as QInstant};
44
45use pi_hash::XHashMap;
46use pi_cancel_timer::Timer;
47use pi_timer::Timer as NotCancelTimer;
48
49use single_thread::SingleTaskRuntime;
50use worker_thread::{WorkerTaskRunner, WorkerRuntime};
51use multi_thread::{MultiTaskRuntimeBuilder, MultiTaskRuntime};
52
53use crate::lock::spin;
54
55thread_local! {
59 static PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME: AtomicPtr<()> = AtomicPtr::new(null_mut());
60 static PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT: UnsafeCell<XHashMap<TypeId, Box<dyn Any + 'static>>> = UnsafeCell::new(XHashMap::default());
61}
62
63thread_local! {
67 static PI_ASYNC_THREAD_LOCAL_ID: UnsafeCell<usize> = UnsafeCell::new(usize::MAX);
68}
69
70const DEFAULT_MAX_HIGH_PRIORITY_BOUNDED: usize = 10;
74
75const DEFAULT_HIGH_PRIORITY_BOUNDED: usize = 5;
79
80const DEFAULT_MAX_LOW_PRIORITY_BOUNDED: usize = 0;
84
85static RUNTIME_UID_GEN: AtomicUsize = AtomicUsize::new(1);
89
90static GLOBAL_TIME_LOOP_STATUS: AtomicBool = AtomicBool::new(false);
94
95pub fn startup_global_time_loop(interval: u64) -> Option<GlobalTimeLoopHandle> {
100 if let Err(_) = GLOBAL_TIME_LOOP_STATUS.compare_exchange(false,
101 true,
102 Ordering::AcqRel,
103 Ordering::Relaxed) {
104 None
106 } else {
107 let timer = Upkeep::new_with_clock(Duration::from_millis(interval), Clock::new());
109 let handle = timer.start().unwrap();
110 let clock = Clock::new();
111 let _now = clock.recent();
112
113 Some(GlobalTimeLoopHandle(handle))
114 }
115}
116
117pub struct GlobalTimeLoopHandle(Handle);
121
122impl Drop for GlobalTimeLoopHandle {
123 fn drop(&mut self) {
124 GLOBAL_TIME_LOOP_STATUS.store(false, Ordering::Release);
125 }
126}
127
128pub fn alloc_rt_uid() -> usize {
132 RUNTIME_UID_GEN.fetch_add(1, Ordering::Relaxed)
133}
134
135pub struct TaskId(UnsafeCell<u128>);
139
140impl Debug for TaskId {
141 fn fmt(&self, f: &mut Formatter) -> FmtResult {
142 write!(f, "TaskId[inner = {}]", unsafe { *self.0.get() })
143 }
144}
145
146impl Clone for TaskId {
147 fn clone(&self) -> Self {
148 unsafe {
149 TaskId(UnsafeCell::new(*self.0.get()))
150 }
151 }
152}
153
154impl TaskId {
155 #[inline]
157 pub fn exist_waker<R: 'static>(&self) -> bool {
158 unsafe {
159 let handle = unsafe { TaskHandle::<R>::from_raw((*self.0.get() >> 64) as *const ()) };
160 let inner = &*handle.0;
161 let r = if let Some(waker) = inner.0.swap(None) {
162 inner.0.swap(Some(waker));
163 true
164 } else {
165 false
166 };
167
168 handle.into_raw();
170
171 r
172 }
173 }
174
175 #[inline]
177 pub fn wakeup<R: 'static>(&self) {
178 unsafe {
179 let handle = unsafe { TaskHandle::<R>::from_raw((*self.0.get() >> 64) as *const ()) };
180 let inner = &*handle.0;
181 if let Some(waker) = inner.0.swap(None) {
182 waker.wake();
184 }
185
186 handle.into_raw();
188 }
189 }
190
191 #[inline]
193 pub fn set_waker<R: 'static>(&self, waker: Waker) -> Option<Waker> {
194 unsafe {
195 let handle = unsafe { TaskHandle::<R>::from_raw((*self.0.get() >> 64) as *const ()) };
196 let inner = &*handle.0;
197 let r = inner.0.swap(Some(waker));
198
199 handle.into_raw();
201
202 r
203 }
204 }
205
206 #[inline]
208 pub fn result<R: 'static>(&self) -> Option<R> {
209 unsafe {
210 let handle = unsafe { TaskHandle::<R>::from_raw((*self.0.get() >> 64) as *const ()) };
211 let inner = &*handle.0;
212 let r = inner.1.swap(None);
213
214 handle.into_raw();
216
217 r
218 }
219 }
220
221 #[inline]
223 pub fn set_result<R: 'static>(&self, result: R) -> Option<R> {
224 unsafe {
225 let handle = unsafe { TaskHandle::<R>::from_raw((*self.0.get() >> 64) as *const ()) };
226 let inner = &*handle.0;
227 let r = inner.1.swap(Some(result));
228
229 handle.into_raw();
231
232 r
233 }
234 }
235}
236
237pub(crate) struct TaskHandle<R: 'static>(Box<(
239 AtomicCell<Option<Waker>>, AtomicCell<Option<R>>, )>);
242
243impl<R: 'static> Default for TaskHandle<R> {
244 fn default() -> Self {
245 TaskHandle(Box::new((AtomicCell::new(None), AtomicCell::new(None))))
246 }
247}
248
249impl<R: 'static> TaskHandle<R> {
250 pub unsafe fn from_raw(raw: *const ()) -> TaskHandle<R> {
252 let inner
253 = Box::from_raw(raw as *const (AtomicCell<Option<Waker>>, AtomicCell<Option<R>>) as *mut (AtomicCell<Option<Waker>>, AtomicCell<Option<R>>));
254 TaskHandle(inner)
255 }
256
257 pub fn into_raw(self) -> *const () {
259 Box::into_raw(self.0)
260 as *mut (AtomicCell<Option<Waker>>, AtomicCell<Option<R>>)
261 as *const (AtomicCell<Option<Waker>>, AtomicCell<Option<R>>)
262 as *const ()
263 }
264}
265
266pub struct AsyncTask<
270 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
271 O: Default + 'static = (),
272> {
273 uid: TaskId, future: Mutex<Option<BoxFuture<'static, O>>>, pool: Arc<P>, priority: usize, context: Option<UnsafeCell<Box<dyn Any>>>, }
279
280impl<
281 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
282 O: Default + 'static,
283> Drop for AsyncTask<P, O> {
284 fn drop(&mut self) {
285 let _ = unsafe { TaskHandle::<O>::from_raw((*self.uid.0.get() >> 64) as usize as *const ()) };
286 }
287}
288
289unsafe impl<
290 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
291 O: Default + 'static,
292> Send for AsyncTask<P, O> {}
293unsafe impl<
294 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
295 O: Default + 'static,
296> Sync for AsyncTask<P, O> {}
297
298impl<
299 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
300 O: Default + 'static,
301> ArcWake for AsyncTask<P, O> {
302 #[cfg(not(target_arch = "aarch64"))]
303 fn wake_by_ref(arc_self: &Arc<Self>) {
304 let pool = arc_self.get_pool();
305 let _ = pool.push_keep(arc_self.clone());
306
307 if let Some(waits) = pool.get_waits() {
308 if let Some(worker_waker) = waits.pop() {
310 let (is_sleep, lock, condvar) = &*worker_waker;
312 let _locked = lock.lock();
313 if is_sleep.load(Ordering::Relaxed) {
314 if let Ok(true) = is_sleep
316 .compare_exchange_weak(true,
317 false,
318 Ordering::SeqCst,
319 Ordering::SeqCst) {
320 condvar.notify_one();
322 }
323 }
324 }
325 } else {
326 if let Some(thread_waker) = pool.get_thread_waker() {
328 if thread_waker.0.load(Ordering::Relaxed) {
330 let (is_sleep, lock, condvar) = &**thread_waker;
331 let _locked = lock.lock();
332 if let Ok(true) = is_sleep
334 .compare_exchange_weak(true,
335 false,
336 Ordering::SeqCst,
337 Ordering::SeqCst) {
338 condvar.notify_one();
340 }
341 }
342 }
343 }
344 }
345 #[cfg(target_arch = "aarch64")]
346 fn wake_by_ref(arc_self: &Arc<Self>) {
347 let pool = arc_self.get_pool();
348 let _ = pool.push_keep(arc_self.clone());
349
350 if let Some(waits) = pool.get_waits() {
351 if let Some(worker_waker) = waits.pop() {
353 let (is_sleep, lock, condvar) = &*worker_waker;
355 let locked = lock.lock();
356 if is_sleep.load(Ordering::Relaxed) {
357 if let Ok(true) = is_sleep
359 .compare_exchange(true,
360 false,
361 Ordering::SeqCst,
362 Ordering::SeqCst) {
363 condvar.notify_one();
365 }
366 }
367 }
368 } else {
369 if let Some(thread_waker) = pool.get_thread_waker() {
371 if thread_waker.0.load(Ordering::Relaxed) {
373 let (is_sleep, lock, condvar) = &**thread_waker;
374 let locked = lock.lock();
375 if let Ok(true) = is_sleep
377 .compare_exchange(true,
378 false,
379 Ordering::SeqCst,
380 Ordering::SeqCst) {
381 condvar.notify_one();
383 }
384 }
385 }
386 }
387 }
388}
389
390impl<
391 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
392 O: Default + 'static,
393> AsyncTask<P, O> {
394 pub fn new(uid: TaskId,
396 pool: Arc<P>,
397 priority: usize,
398 future: Option<BoxFuture<'static, O>>) -> AsyncTask<P, O> {
399 AsyncTask {
400 uid,
401 future: Mutex::new(future),
402 pool,
403 priority,
404 context: None,
405 }
406 }
407
408 pub fn with_context<C: 'static>(uid: TaskId,
410 pool: Arc<P>,
411 priority: usize,
412 future: Option<BoxFuture<'static, O>>,
413 context: C) -> AsyncTask<P, O> {
414 let any = Box::new(context);
415
416 AsyncTask {
417 uid,
418 future: Mutex::new(future),
419 pool,
420 priority,
421 context: Some(UnsafeCell::new(any)),
422 }
423 }
424
425 pub fn with_runtime_and_context<RT, C>(runtime: &RT,
427 priority: usize,
428 future: Option<BoxFuture<'static, O>>,
429 context: C) -> AsyncTask<P, O>
430 where RT: AsyncRuntime<O, Pool = P>,
431 C: Send + 'static {
432 let any = Box::new(context);
433
434 AsyncTask {
435 uid: runtime.alloc::<O>(),
436 future: Mutex::new(future),
437 pool: runtime.shared_pool(),
438 priority,
439 context: Some(UnsafeCell::new(any)),
440 }
441 }
442
443 pub fn is_enable_wakeup(&self) -> bool {
445 self.uid.exist_waker::<O>()
446 }
447
448 pub fn get_inner(&self) -> Option<BoxFuture<'static, O>> {
450 self.future.lock().take()
451 }
452
453 pub fn set_inner(&self, inner: Option<BoxFuture<'static, O>>) {
455 *self.future.lock() = inner;
456 }
457
458 #[inline]
460 pub fn owner(&self) -> usize {
461 unsafe {
462 *self.uid.0.get() as usize
463 }
464 }
465
466 #[inline]
468 pub fn priority(&self) -> usize {
469 self.priority
470 }
471
472 pub fn exist_context(&self) -> bool {
474 self.context.is_some()
475 }
476
477 pub fn get_context<C: Send + 'static>(&self) -> Option<&C> {
479 if let Some(context) = &self.context {
480 let any = unsafe { &*context.get() };
482 return <dyn Any>::downcast_ref::<C>(&**any);
483 }
484
485 None
486 }
487
488 pub fn get_context_mut<C: Send + 'static>(&self) -> Option<&mut C> {
490 if let Some(context) = &self.context {
491 let any = unsafe { &mut *context.get() };
493 return <dyn Any>::downcast_mut::<C>(&mut **any);
494 }
495
496 None
497 }
498
499 pub fn set_context<C: Send + 'static>(&self, new: C) {
501 if let Some(context) = &self.context {
502 let _ = unsafe { &*context.get() };
504
505 let any: Box<dyn Any + 'static> = Box::new(new);
507 unsafe { *context.get() = any; }
508 }
509 }
510
511 pub fn get_pool(&self) -> &P {
513 self.pool.as_ref()
514 }
515}
516
517pub trait AsyncTaskPool<O: Default + 'static = ()>: Default + Send + Sync + 'static {
521 type Pool: AsyncTaskPoolExt<O> + AsyncTaskPool<O>;
522
523 fn get_thread_id(&self) -> usize;
525
526 fn len(&self) -> usize;
528
529 fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
531
532 fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
534
535 fn push_priority(&self,
537 priority: usize,
538 task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
539
540 fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()>;
542
543 fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>>;
545
546 fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>>;
548
549 fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>>;
551}
552
553pub trait AsyncTaskPoolExt<O: Default + 'static = ()>: Send + Sync + 'static {
557 fn set_waits(&mut self,
559 _waits: Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>) {}
560
561 fn get_waits(&self) -> Option<&Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>> {
563 None
565 }
566
567 fn idler_len(&self) -> usize {
569 0
571 }
572
573 fn spawn_worker(&self) -> Option<usize> {
575 None
577 }
578
579 fn worker_len(&self) -> usize {
581 #[cfg(not(target_arch = "wasm32"))]
583 return num_cpus::get();
584 #[cfg(target_arch = "wasm32")]
585 return 1;
586 }
587
588 fn buffer_len(&self) -> usize {
590 0
592 }
593
594 fn set_thread_waker(&mut self, _thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) {
596 }
598
599 fn clone_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
601 None
603 }
604
605 fn close_worker(&self) {
607 }
609}
610
611pub trait AsyncRuntime<O: Default + 'static = ()>: Clone + Send + Sync + 'static {
615 type Pool: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = Self::Pool>;
616
617 fn shared_pool(&self) -> Arc<Self::Pool>;
619
620 fn get_id(&self) -> usize;
622
623 fn wait_len(&self) -> usize;
625
626 fn len(&self) -> usize;
628
629 fn alloc<R: 'static>(&self) -> TaskId;
631
632 fn spawn<F>(&self, future: F) -> Result<TaskId>
634 where F: Future<Output = O> + Send + 'static;
635
636 fn spawn_local<F>(&self, future: F) -> Result<TaskId>
638 where F: Future<Output = O> + Send + 'static;
639
640 fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
642 where F: Future<Output = O> + Send + 'static;
643
644 fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
646 where F: Future<Output = O> + Send + 'static;
647
648 fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
650 where F: Future<Output = O> + Send + 'static;
651
652 fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
654 where F: Future<Output = O> + Send + 'static;
655
656 fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
658 where F: Future<Output = O> + Send + 'static;
659
660 fn spawn_priority_by_id<F>(&self,
662 task_id: TaskId,
663 priority: usize,
664 future: F) -> Result<()>
665 where F: Future<Output = O> + Send + 'static;
666
667 fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
669 where F: Future<Output = O> + Send + 'static;
670
671 fn spawn_timing_by_id<F>(&self,
673 task_id: TaskId,
674 future: F,
675 time: usize) -> Result<()>
676 where F: Future<Output = O> + Send + 'static;
677
678 fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output>;
680
681 fn wakeup<Output: 'static>(&self, task_id: &TaskId);
683
684 fn wait<V: Send + 'static>(&self) -> AsyncWait<V>;
686
687 fn wait_any<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAny<V>;
689
690 fn wait_any_callback<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V>;
692
693 fn map_reduce<V: Send + 'static>(&self, capacity: usize) -> AsyncMapReduce<V>;
695
696 fn timeout(&self, timeout: usize) -> BoxFuture<'static, ()>;
698
699 fn yield_now(&self) -> BoxFuture<'static, ()>;
701
702 fn pipeline<S, SO, F, FO>(&self, input: S, filter: F) -> BoxStream<'static, FO>
704 where S: Stream<Item = SO> + Send + 'static,
705 SO: Send + 'static,
706 F: FnMut(SO) -> AsyncPipelineResult<FO> + Send + 'static,
707 FO: Send + 'static;
708
709 fn close(&self) -> bool;
711}
712
713pub trait AsyncRuntimeExt<O: Default + 'static = ()> {
717 fn spawn_with_context<F, C>(&self,
719 task_id: TaskId,
720 future: F,
721 context: C) -> Result<()>
722 where F: Future<Output = O> + Send + 'static,
723 C: 'static;
724
725 fn spawn_timing_with_context<F, C>(&self,
727 task_id: TaskId,
728 future: F,
729 context: C,
730 time: usize) -> Result<()>
731 where F: Future<Output = O> + Send + 'static,
732 C: Send + 'static;
733
734 fn block_on<F>(&self, future: F) -> Result<F::Output>
736 where F: Future + Send + 'static,
737 <F as Future>::Output: Default + Send + 'static;
738}
739
740pub struct AsyncRuntimeBuilder<O: Default + 'static = ()>(PhantomData<O>);
744
745impl<O: Default + 'static> AsyncRuntimeBuilder<O> {
746 pub fn default_worker_thread(worker_name: Option<&str>,
748 worker_stack_size: Option<usize>,
749 worker_sleep_timeout: Option<u64>,
750 worker_loop_interval: Option<Option<u64>>) -> WorkerRuntime<O> {
751 let runner = WorkerTaskRunner::default();
752
753 let thread_name = if let Some(name) = worker_name {
754 name
755 } else {
756 "Default-Single-Worker"
758 };
759 let thread_stack_size = if let Some(size) = worker_stack_size {
760 size
761 } else {
762 2 * 1024 * 1024
764 };
765 let sleep_timeout = if let Some(timeout) = worker_sleep_timeout {
766 timeout
767 } else {
768 1
770 };
771 let loop_interval = if let Some(interval) = worker_loop_interval {
772 interval
773 } else {
774 None
776 };
777
778 let clock = Clock::new();
780 let runner_copy = runner.clone();
781 let rt_copy = runner.get_runtime();
782 let rt = runner.startup(
783 thread_name,
784 thread_stack_size,
785 sleep_timeout,
786 loop_interval,
787 move || {
788 let last = clock.recent();
789 match runner_copy.run_once() {
790 Err(e) => {
791 panic!("Run runner failed, reason: {:?}", e);
792 },
793 Ok(len) => {
794 (len == 0,
795 clock
796 .recent()
797 .duration_since(last))
798 },
799 }
800 },
801 move || {
802 rt_copy.wait_len() + rt_copy.len()
803 },
804 );
805
806 rt
807 }
808
809 pub fn custom_worker_thread<P, F0, F1>(pool: P,
811 worker_handle: Arc<AtomicBool>,
812 worker_condvar: Arc<(AtomicBool, Mutex<()>, Condvar)>,
813 thread_name: &str,
814 thread_stack_size: usize,
815 sleep_timeout: u64,
816 loop_interval: Option<u64>,
817 loop_func: F0,
818 get_queue_len: F1) -> WorkerRuntime<O, P>
819 where P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
820 F0: Fn() -> (bool, Duration) + Send + 'static,
821 F1: Fn() -> usize + Send + 'static {
822 let runner = WorkerTaskRunner::new(pool,
823 worker_handle,
824 worker_condvar);
825
826 let rt_copy = runner.get_runtime();
828 let rt = runner.startup(
829 thread_name,
830 thread_stack_size,
831 sleep_timeout,
832 loop_interval,
833 loop_func,
834 move || {
835 rt_copy.wait_len() + get_queue_len()
836 },
837 );
838
839 rt
840 }
841
842 pub fn default_multi_thread(worker_prefix: Option<&str>,
844 worker_stack_size: Option<usize>,
845 worker_size: Option<usize>,
846 worker_sleep_timeout: Option<u64>) -> MultiTaskRuntime<O> {
847 let mut builder = MultiTaskRuntimeBuilder::default();
848
849 if let Some(thread_prefix) = worker_prefix {
850 builder = builder.thread_prefix(thread_prefix);
851 }
852 if let Some(thread_stack_size) = worker_stack_size {
853 builder = builder.thread_stack_size(thread_stack_size);
854 }
855 if let Some(size) = worker_size {
856 builder = builder
857 .init_worker_size(size)
858 .set_worker_limit(size, size);
859 }
860 if let Some(sleep_timeout) = worker_sleep_timeout {
861 builder = builder.set_timeout(sleep_timeout);
862 }
863
864 builder.build()
865 }
866
867 pub fn custom_multi_thread<P>(pool: P,
869 worker_prefix: &str,
870 worker_stack_size: usize,
871 worker_size: usize,
872 worker_sleep_timeout: u64,
873 worker_timer_interval: usize) -> MultiTaskRuntime<O, P>
874 where P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P> {
875 MultiTaskRuntimeBuilder::new(pool)
876 .thread_prefix(worker_prefix)
877 .thread_stack_size(worker_stack_size)
878 .init_worker_size(worker_size)
879 .set_worker_limit(worker_size, worker_size)
880 .set_timeout(worker_sleep_timeout)
881 .set_timer_interval(worker_timer_interval)
882 .build()
883 }
884}
885
886pub fn bind_local_thread<O: Default + 'static>(runtime: LocalAsyncRuntime<O>) {
888 match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
889 let raw = Arc::into_raw(Arc::new(runtime)) as *mut LocalAsyncRuntime<O> as *mut ();
890 rt.store(raw, Ordering::Relaxed);
891 }) {
892 Err(e) => {
893 panic!("Bind single runtime to local thread failed, reason: {:?}", e);
894 },
895 Ok(_) => (),
896 }
897}
898
899pub fn unbind_local_thread() {
901 let _ = PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
902 rt.store(null_mut(), Ordering::Relaxed);
903 });
904}
905
906pub struct LocalAsyncRuntime<O: Default + 'static> {
910 inner: *const (), get_id_func: fn(*const ()) -> usize, spawn_func: fn(*const (), BoxFuture<'static, O>) -> Result<()>, spawn_timing_func: fn(*const (), BoxFuture<'static, O>, usize) -> Result<()>, timeout_func: fn(*const (), usize) -> BoxFuture<'static, ()>, }
916
917unsafe impl<O: Default + 'static> Send for LocalAsyncRuntime<O> {}
918unsafe impl<O: Default + 'static> Sync for LocalAsyncRuntime<O> {}
919
920impl<O: Default + 'static> LocalAsyncRuntime<O> {
921 pub fn new(inner: *const (),
923 get_id_func: fn(*const ()) -> usize,
924 spawn_func: fn(*const (), BoxFuture<'static, O>) -> Result<()>,
925 spawn_timing_func: fn(*const (), BoxFuture<'static, O>, usize) -> Result<()>,
926 timeout_func: fn(*const (), usize) -> BoxFuture<'static, ()>) -> Self {
927 LocalAsyncRuntime {
928 inner,
929 get_id_func,
930 spawn_func,
931 spawn_timing_func,
932 timeout_func,
933 }
934 }
935
936 #[inline]
938 pub fn get_id(&self) -> usize {
939 (self.get_id_func)(self.inner)
940 }
941
942 #[inline]
944 pub fn spawn<F>(&self, future: F) -> Result<()>
945 where F: Future<Output = O> + Send + 'static {
946 (self.spawn_func)(self.inner, async move {
947 future.await
948 }.boxed())
949 }
950
951 #[inline]
953 pub fn sapwn_timing_func<F>(&self, future: F, timeout: usize) -> Result<()>
954 where F: Future<Output = O> + Send + 'static {
955 (self.spawn_timing_func)(self.inner,
956 async move {
957 future.await
958 }.boxed(),
959 timeout)
960 }
961
962 #[inline]
964 pub fn timeout(&self, timeout: usize) -> BoxFuture<'static, ()> {
965 (self.timeout_func)(self.inner, timeout)
966 }
967}
968
969pub fn local_async_runtime<O: Default + 'static>() -> Option<Arc<LocalAsyncRuntime<O>>> {
974 match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |ptr| {
975 let raw = ptr.load(Ordering::Relaxed) as *const LocalAsyncRuntime<O>;
976 unsafe {
977 if raw.is_null() {
978 None
980 } else {
981 let shared: Arc<LocalAsyncRuntime<O>> = unsafe { Arc::from_raw(raw) };
983 let result = shared.clone();
984 Arc::into_raw(shared); Some(result)
986 }
987 }
988 }) {
989 Err(_) => None, Ok(rt) => rt,
991 }
992}
993
994pub fn spawn_local<O, F>(future: F) -> Result<()>
999 where O: Default + 'static,
1000 F: Future<Output = O> + Send + 'static {
1001 if let Some(rt) = local_async_runtime::<O>() {
1002 rt.spawn(future)
1003 } else {
1004 Err(Error::new(ErrorKind::Other, format!("Spawn task to local thread failed, reason: runtime not exist")))
1005 }
1006}
1007
1008pub fn get_local_dict<T: 'static>() -> Option<&'static T> {
1012 match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT.try_with(move |dict| {
1013 unsafe {
1014 if let Some(any) = (&*dict.get()).get(&TypeId::of::<T>()) {
1015 <dyn Any>::downcast_ref::<T>(&**any)
1017 } else {
1018 None
1020 }
1021 }
1022 }) {
1023 Err(_) => {
1024 None
1025 },
1026 Ok(result) => {
1027 result
1028 }
1029 }
1030}
1031
1032pub fn get_local_dict_mut<T: 'static>() -> Option<&'static mut T> {
1036 match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT.try_with(move |dict| {
1037 unsafe {
1038 if let Some(any) = (&mut *dict.get()).get_mut(&TypeId::of::<T>()) {
1039 <dyn Any>::downcast_mut::<T>(&mut **any)
1041 } else {
1042 None
1044 }
1045 }
1046 }) {
1047 Err(_) => {
1048 None
1049 },
1050 Ok(result) => {
1051 result
1052 }
1053 }
1054}
1055
1056pub fn set_local_dict<T: 'static>(value: T) -> Option<T> {
1060 match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT.try_with(move |dict| {
1061 unsafe {
1062 let result = if let Some(any) = (&mut *dict.get()).remove(&TypeId::of::<T>()) {
1063 if let Ok(r) = any.downcast() {
1065 Some(*r)
1067 } else {
1068 None
1069 }
1070 } else {
1071 None
1073 };
1074
1075 (&mut *dict.get()).insert(TypeId::of::<T>(), Box::new(value) as Box<dyn Any>);
1077
1078 result
1079 }
1080 }) {
1081 Err(_) => {
1082 None
1083 },
1084 Ok(result) => {
1085 result
1086 }
1087 }
1088}
1089
1090pub fn remove_local_dict<T: 'static>() -> Option<T> {
1094 match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT.try_with(move |dict| {
1095 unsafe {
1096 if let Some(any) = (&mut *dict.get()).remove(&TypeId::of::<T>()) {
1097 if let Ok(r) = any.downcast() {
1099 Some(*r)
1101 } else {
1102 None
1103 }
1104 } else {
1105 None
1107 }
1108 }
1109 }) {
1110 Err(_) => {
1111 None
1112 },
1113 Ok(result) => {
1114 result
1115 }
1116 }
1117}
1118
1119pub fn clear_local_dict() -> Result<()> {
1123 match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME_DICT.try_with(move |dict| {
1124 unsafe {
1125 (&mut *dict.get()).clear();
1126 }
1127 }) {
1128 Err(e) => {
1129 Err(Error::new(ErrorKind::Other, format!("Clear local dict failed, reason: {:?}", e)))
1130 },
1131 Ok(_) => {
1132 Ok(())
1133 }
1134 }
1135}
1136
1137pub struct AsyncValue<V: Send + 'static>(Arc<InnerAsyncValue<V>>);
1141
1142unsafe impl<V: Send + 'static> Send for AsyncValue<V> {}
1143unsafe impl<V: Send + 'static> Sync for AsyncValue<V> {}
1144
1145impl<V: Send + 'static> Clone for AsyncValue<V> {
1146 fn clone(&self) -> Self {
1147 AsyncValue(self.0.clone())
1148 }
1149}
1150
1151impl<V: Send + 'static> Debug for AsyncValue<V> {
1152 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1153 write!(f,
1154 "AsyncValue[status = {}]",
1155 self.0.status.load(Ordering::Acquire))
1156 }
1157}
1158
1159impl<V: Send + 'static> Future for AsyncValue<V> {
1160 type Output = V;
1161
1162 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1163 let mut spin_len = 1;
1164 while self.0.status.load(Ordering::Acquire) == 2 {
1165 spin_len = spin(spin_len);
1167 }
1168
1169 if self.0.status.load(Ordering::Acquire) == 3 {
1170 if let Some(value) = unsafe { (*(&self).0.value.get()).take() } {
1171 return Poll::Ready(value);
1173 }
1174 }
1175
1176 unsafe {
1177 *self.0.waker.get() = Some(cx.waker().clone()); }
1179
1180 let mut spin_len = 1;
1181 loop {
1182 match self.0.status.compare_exchange(0,
1183 1, Ordering::Acquire,
1184 Ordering::Relaxed) {
1185 Err(2) => {
1186 spin_len = spin(spin_len);
1188 continue;
1189 },
1190 Err(3) => {
1191 let value = unsafe { (*(&self).0.value.get()).take().unwrap() };
1193 return Poll::Ready(value);
1194 },
1195 Err(_) => {
1196 unimplemented!();
1197 },
1198 Ok(_) => {
1199 return Poll::Pending;
1201 },
1202 }
1203 }
1204 }
1205}
1206
1207impl<V: Send + 'static> AsyncValue<V> {
1211 pub fn new() -> Self {
1213 let inner = InnerAsyncValue {
1214 value: UnsafeCell::new(None),
1215 waker: UnsafeCell::new(None),
1216 status: AtomicU8::new(0),
1217 };
1218
1219 AsyncValue(Arc::new(inner))
1220 }
1221
1222 pub fn is_complete(&self) -> bool {
1224 self
1225 .0
1226 .status
1227 .load(Ordering::Relaxed) == 3
1228 }
1229
1230 pub fn set(self, value: V) {
1232 loop {
1233 match self.0.status.compare_exchange(1,
1234 2,
1235 Ordering::Acquire,
1236 Ordering::Relaxed) {
1237 Err(0) => {
1238 match self.0.status.compare_exchange(0,
1239 2,
1240 Ordering::Acquire,
1241 Ordering::Relaxed) {
1242 Err(1) => {
1243 continue;
1245 },
1246 Err(_) => {
1247 return;
1249 },
1250 Ok(_) => {
1251 unsafe { *self.0.value.get() = Some(value); }
1253 self.0.status.store(3, Ordering::Release);
1254 return;
1255 }
1256 }
1257 },
1258 Err(_) => {
1259 return;
1261 },
1262 Ok(_) => {
1263 break;
1265 }
1266 }
1267 }
1268
1269 unsafe { *self.0.value.get() = Some(value); }
1271 self.0.status.store(3, Ordering::Release);
1272 let waker = unsafe { (*self.0.waker.get()).take().unwrap() };
1273 waker.wake();
1274 }
1275}
1276
1277pub struct InnerAsyncValue<V: Send + 'static> {
1279 value: UnsafeCell<Option<V>>, waker: UnsafeCell<Option<Waker>>, status: AtomicU8, }
1283
1284pub struct AsyncVariableGuard<'a, V: Send + 'static> {
1288 value: &'a UnsafeCell<Option<V>>, waker: &'a UnsafeCell<Option<Waker>>, status: &'a AtomicU8, }
1292
1293unsafe impl<V: Send + 'static> Send for AsyncVariableGuard<'_, V> {}
1294
1295impl<V: Send + 'static> Drop for AsyncVariableGuard<'_, V> {
1296 fn drop(&mut self) {
1297 self.status.fetch_sub(2, Ordering::Relaxed);
1301 }
1302}
1303
1304impl<V: Send + 'static> Deref for AsyncVariableGuard<'_, V> {
1305 type Target = Option<V>;
1306
1307 fn deref(&self) -> &Self::Target {
1308 unsafe {
1309 &*self.value.get()
1310 }
1311 }
1312}
1313
1314impl<V: Send + 'static> DerefMut for AsyncVariableGuard<'_, V> {
1315 fn deref_mut(&mut self) -> &mut Self::Target {
1316 unsafe {
1317 &mut *self.value.get()
1318 }
1319 }
1320}
1321
1322impl<V: Send + 'static> AsyncVariableGuard<'_, V> {
1323 pub fn finish(self) {
1325 if self.status.fetch_add(4, Ordering::Relaxed) == 3 {
1327 if let Some(waker) = unsafe { (&mut *self.waker.get()).take() } {
1328 waker.wake();
1330 }
1331 }
1332 }
1333}
1334
1335pub struct AsyncVariable<V: Send + 'static>(Arc<InnerAsyncVariable<V>>);
1339
1340unsafe impl<V: Send + 'static> Send for AsyncVariable<V> {}
1341unsafe impl<V: Send + 'static> Sync for AsyncVariable<V> {}
1342
1343impl<V: Send + 'static> Clone for AsyncVariable<V> {
1344 fn clone(&self) -> Self {
1345 AsyncVariable(self.0.clone())
1346 }
1347}
1348
1349impl<V: Send + 'static> Future for AsyncVariable<V> {
1350 type Output = V;
1351
1352 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1353 unsafe {
1354 *self.0.waker.get() = Some(cx.waker().clone()); }
1356
1357 let mut spin_len = 1;
1358 loop {
1359 match self.0.status.compare_exchange(0,
1360 1,
1361 Ordering::Acquire,
1362 Ordering::Relaxed) {
1363 Err(current) if current & 4 != 0 => {
1364 unsafe {
1366 let _ = (&mut *self.0.waker.get()).take(); return Poll::Ready((&mut *(&self).0.value.get()).take().unwrap());
1368 }
1369 },
1370 Err(_) => {
1371 spin_len = spin(spin_len);
1373 },
1374 Ok(_) => {
1375 return Poll::Pending;
1377 },
1378 }
1379 }
1380 }
1381}
1382
1383impl<V: Send + 'static> AsyncVariable<V> {
1384 pub fn new() -> Self {
1386 let inner = InnerAsyncVariable {
1387 value: UnsafeCell::new(None),
1388 waker: UnsafeCell::new(None),
1389 status: AtomicU8::new(0),
1390 };
1391
1392 AsyncVariable(Arc::new(inner))
1393 }
1394
1395 pub fn is_complete(&self) -> bool {
1397 self
1398 .0
1399 .status
1400 .load(Ordering::Acquire) & 4 != 0
1401 }
1402
1403 pub fn lock(&self) -> Option<AsyncVariableGuard<V>> {
1405 let mut spin_len = 1;
1406 loop {
1407 match self
1408 .0
1409 .status
1410 .compare_exchange(1,
1411 3,
1412 Ordering::Acquire,
1413 Ordering::Relaxed) {
1414 Err(0) => {
1415 match self
1417 .0
1418 .status
1419 .compare_exchange(0,
1420 2,
1421 Ordering::Acquire,
1422 Ordering::Relaxed) {
1423 Err(1) => {
1424 continue;
1426 },
1427 Err(2) => {
1428 spin_len = spin(spin_len);
1430 },
1431 Err(3) => {
1432 spin_len = spin(spin_len);
1434 },
1435 Err(_) => {
1436 return None;
1438 },
1439 Ok(_) => {
1440 let guard = AsyncVariableGuard {
1442 value: &self.0.value,
1443 waker: &self.0.waker,
1444 status: &self.0.status,
1445 };
1446
1447 return Some(guard)
1448 },
1449 }
1450 },
1451 Err(2) => {
1452 spin_len = spin(spin_len);
1454 },
1455 Err(3) => {
1456 spin_len = spin(spin_len);
1458 },
1459 Err(_) => {
1460 return None;
1462 }
1463 Ok(_) => {
1464 let guard = AsyncVariableGuard {
1466 value: &self.0.value,
1467 waker: &self.0.waker,
1468 status: &self.0.status,
1469 };
1470
1471 return Some(guard)
1472 },
1473 }
1474 }
1475 }
1476}
1477
1478pub struct InnerAsyncVariable<V: Send + 'static> {
1480 value: UnsafeCell<Option<V>>, waker: UnsafeCell<Option<Waker>>, status: AtomicU8, }
1484
1485pub struct AsyncWaitResult<V: Send + 'static>(pub Arc<RefCell<Option<Result<V>>>>);
1489
1490unsafe impl<V: Send + 'static> Send for AsyncWaitResult<V> {}
1491unsafe impl<V: Send + 'static> Sync for AsyncWaitResult<V> {}
1492
1493impl<V: Send + 'static> Clone for AsyncWaitResult<V> {
1494 fn clone(&self) -> Self {
1495 AsyncWaitResult(self.0.clone())
1496 }
1497}
1498
1499pub struct AsyncWaitResults<V: Send + 'static>(pub Arc<RefCell<Option<Vec<Result<V>>>>>);
1503
1504unsafe impl<V: Send + 'static> Send for AsyncWaitResults<V> {}
1505unsafe impl<V: Send + 'static> Sync for AsyncWaitResults<V> {}
1506
1507impl<V: Send + 'static> Clone for AsyncWaitResults<V> {
1508 fn clone(&self) -> Self {
1509 AsyncWaitResults(self.0.clone())
1510 }
1511}
1512
1513pub enum AsyncTimingTask<
1517 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1518 O: Default + 'static = (),
1519> {
1520 Pended(TaskId), WaitRun(Arc<AsyncTask<P, O>>), }
1523
1524pub struct AsyncTaskTimer<
1528 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1529 O: Default + 'static = (),
1530> {
1531 producor: Sender<(usize, AsyncTimingTask<P, O>)>, consumer: Receiver<(usize, AsyncTimingTask<P, O>)>, timer: Arc<RefCell<Timer<AsyncTimingTask<P, O>, 1000, 60, 3>>>, clock: Clock, now: QInstant, }
1537
1538unsafe impl<
1539 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1540 O: Default + 'static,
1541> Send for AsyncTaskTimer<P, O> {}
1542unsafe impl<
1543 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1544 O: Default + 'static,
1545> Sync for AsyncTaskTimer<P, O> {}
1546
1547impl<
1548 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1549 O: Default + 'static,
1550> AsyncTaskTimer<P, O> {
1551 pub fn new() -> Self {
1553 let (producor, consumer) = unbounded();
1554 let clock = Clock::new();
1555 let now = clock.recent();
1556
1557 AsyncTaskTimer {
1558 producor,
1559 consumer,
1560 timer: Arc::new(RefCell::new(Timer::<AsyncTimingTask<P, O>, 1000, 60, 3>::default())),
1561 clock,
1562 now,
1563 }
1564 }
1565
1566 #[inline]
1568 pub fn get_producor(&self) -> &Sender<(usize, AsyncTimingTask<P, O>)> {
1569 &self.producor
1570 }
1571
1572 #[inline]
1574 pub fn len(&self) -> usize {
1575 let timer = self.timer.as_ref().borrow();
1576 timer.add_count() - timer.remove_count()
1577 }
1578
1579 pub fn set_timer(&self, task: AsyncTimingTask<P, O>, timeout: usize) -> usize {
1581 let current_time = self
1582 .clock
1583 .recent()
1584 .duration_since(self.now)
1585 .as_millis() as u64;
1586 self
1587 .timer
1588 .borrow_mut()
1589 .push_time(current_time + timeout as u64, task)
1590 .data()
1591 .as_ffi() as usize
1592 }
1593
1594 pub fn cancel_timer(&self, timer_ref: usize) -> Option<AsyncTimingTask<P, O>> {
1596 if let Some(item) = self
1597 .timer
1598 .borrow_mut()
1599 .cancel(KeyData::from_ffi(timer_ref as u64).into()) {
1600 Some(item)
1601 } else {
1602 None
1603 }
1604 }
1605
1606 pub fn consume(&self) -> usize {
1608 let timer_tasks = self.consumer.try_iter().collect::<Vec<(usize, AsyncTimingTask<P, O>)>>();
1609 let len = timer_tasks.len();
1610 for (timeout, task) in timer_tasks {
1611 self.set_timer(task, timeout);
1612 }
1613
1614 len
1615 }
1616
1617 pub fn is_require_pop(&self) -> Option<u64> {
1619 let current_time = self
1620 .clock
1621 .recent()
1622 .duration_since(self.now)
1623 .as_millis() as u64;
1624 if self.timer.borrow_mut().is_ok(current_time) {
1625 Some(current_time)
1626 } else {
1627 None
1628 }
1629 }
1630
1631 pub fn pop(&self, current_time: u64) -> Option<(usize, AsyncTimingTask<P, O>)> {
1633 if let Some((key, item)) = self.timer.borrow_mut().pop_kv(current_time) {
1634 Some((key.data().as_ffi() as usize, item))
1635 } else {
1636 None
1637 }
1638 }
1639}
1640
1641pub struct AsyncTaskTimerByNotCancel<
1645 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1646 O: Default + 'static = (),
1647> {
1648 producor: Sender<(usize, AsyncTimingTask<P, O>)>, consumer: Receiver<(usize, AsyncTimingTask<P, O>)>, timer: Arc<RefCell<NotCancelTimer<AsyncTimingTask<P, O>, 1000, 60, 3>>>, clock: Clock, now: QInstant, }
1654
1655unsafe impl<
1656 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1657 O: Default + 'static,
1658> Send for AsyncTaskTimerByNotCancel<P, O> {}
1659unsafe impl<
1660 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1661 O: Default + 'static,
1662> Sync for AsyncTaskTimerByNotCancel<P, O> {}
1663
1664impl<
1665 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1666 O: Default + 'static,
1667> AsyncTaskTimerByNotCancel<P, O> {
1668 pub fn new() -> Self {
1670 let (producor, consumer) = unbounded();
1671 let clock = Clock::new();
1672 let now = clock.recent();
1673
1674 AsyncTaskTimerByNotCancel {
1675 producor,
1676 consumer,
1677 timer: Arc::new(RefCell::new(NotCancelTimer::<AsyncTimingTask<P, O>, 1000, 60, 3>::default())),
1678 clock,
1679 now,
1680 }
1681 }
1682
1683 #[inline]
1685 pub fn get_producor(&self) -> &Sender<(usize, AsyncTimingTask<P, O>)> {
1686 &self.producor
1687 }
1688
1689 #[inline]
1691 pub fn len(&self) -> usize {
1692 let timer = self.timer.as_ref().borrow();
1693 timer.add_count() - timer.remove_count()
1694 }
1695
1696 pub fn set_timer(&self, task: AsyncTimingTask<P, O>, timeout: usize) {
1698 self
1699 .timer
1700 .borrow_mut()
1701 .push(timeout, task);
1702 }
1703
1704 pub fn consume(&self) -> usize {
1706 let timer_tasks = self.consumer.try_iter().collect::<Vec<(usize, AsyncTimingTask<P, O>)>>();
1707 let len = timer_tasks.len();
1708 for (timeout, task) in timer_tasks {
1709 self.set_timer(task, timeout);
1710 }
1711
1712 len
1713 }
1714
1715 pub fn is_require_pop(&self) -> Option<u64> {
1717 let current_time = self
1718 .clock
1719 .recent()
1720 .duration_since(self.now)
1721 .as_millis() as u64;
1722 if self.timer.borrow_mut().is_ok(current_time) {
1723 Some(current_time)
1724 } else {
1725 None
1726 }
1727 }
1728
1729 pub fn pop(&self, current_time: u64) -> Option<AsyncTimingTask<P, O>> {
1731 if let Some(item) = self.timer.borrow_mut().pop(current_time) {
1732 Some(item)
1733 } else {
1734 None
1735 }
1736 }
1737}
1738
1739pub struct AsyncWaitTimeout<
1743 RT: AsyncRuntime<O>,
1744 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1745 O: Default + 'static = (),
1746> {
1747 rt: RT, producor: Sender<(usize, AsyncTimingTask<P, O>)>, timeout: usize, expired: AtomicBool, }
1752
1753unsafe impl<
1754 RT: AsyncRuntime<O>,
1755 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1756 O: Default + 'static,
1757> Send for AsyncWaitTimeout<RT, P, O> {}
1758unsafe impl<
1759 RT: AsyncRuntime<O>,
1760 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1761 O: Default + 'static,
1762> Sync for AsyncWaitTimeout<RT, P, O> {}
1763
1764impl<
1765 RT: AsyncRuntime<O>,
1766 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1767 O: Default + 'static,
1768> Future for AsyncWaitTimeout<RT, P, O> {
1769 type Output = ();
1770
1771 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1772 if (&self).expired.load(Ordering::Relaxed) {
1773 return Poll::Ready(());
1775 } else {
1776 (&self).expired.store(true, Ordering::Relaxed);
1778 }
1779
1780 let task_id = self.rt.alloc::<O>();
1781 let reply = self.rt.pending(&task_id, cx.waker().clone());
1782
1783 let r = (&self).producor.send(((&self).timeout, AsyncTimingTask::Pended(task_id.clone())));
1785 reply
1786 }
1787}
1788
1789impl<
1790 RT: AsyncRuntime<O>,
1791 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1792 O: Default + 'static,
1793> AsyncWaitTimeout<RT, P, O> {
1794 pub fn new(rt: RT,
1796 producor: Sender<(usize, AsyncTimingTask<P, O>)>,
1797 timeout: usize) -> Self {
1798 AsyncWaitTimeout {
1799 rt,
1800 producor,
1801 timeout,
1802 expired: AtomicBool::new(false), }
1804 }
1805}
1806
1807pub struct LocalAsyncWaitTimeout<
1811 RT: AsyncRuntime<O>,
1812 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1813 O: Default + 'static = (),
1814> {
1815 rt: RT, timer: Arc<AsyncTaskTimerByNotCancel<P, O>>, timeout: usize, expired: AtomicBool, }
1820
1821unsafe impl<
1822 RT: AsyncRuntime<O>,
1823 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1824 O: Default + 'static,
1825> Send for LocalAsyncWaitTimeout<RT, P, O> {}
1826unsafe impl<
1827 RT: AsyncRuntime<O>,
1828 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1829 O: Default + 'static,
1830> Sync for LocalAsyncWaitTimeout<RT, P, O> {}
1831
1832impl<
1833 RT: AsyncRuntime<O>,
1834 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1835 O: Default + 'static,
1836> Future for LocalAsyncWaitTimeout<RT, P, O> {
1837 type Output = ();
1838
1839 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1840 if (&self).expired.load(Ordering::Relaxed) {
1841 return Poll::Ready(());
1843 } else {
1844 (&self).expired.store(true, Ordering::Relaxed);
1846 }
1847
1848 let task_id = self.rt.alloc::<O>();
1849 let reply = self.rt.pending(&task_id, cx.waker().clone());
1850
1851 (&self)
1853 .timer
1854 .set_timer(AsyncTimingTask::Pended(task_id.clone()),
1855 (&self).timeout);
1856 reply
1857 }
1858}
1859
1860impl<
1861 RT: AsyncRuntime<O>,
1862 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
1863 O: Default + 'static,
1864> LocalAsyncWaitTimeout<RT, P, O> {
1865 pub fn new(rt: RT,
1867 timer: Arc<AsyncTaskTimerByNotCancel<P, O>>,
1868 timeout: usize) -> Self {
1869 LocalAsyncWaitTimeout {
1870 rt,
1871 timer,
1872 timeout,
1873 expired: AtomicBool::new(false), }
1875 }
1876}
1877
1878pub struct AsyncWait<V: Send + 'static>(AsyncWaitAny<V>);
1882
1883unsafe impl<V: Send + 'static> Send for AsyncWait<V> {}
1884unsafe impl<V: Send + 'static> Sync for AsyncWait<V> {}
1885
1886impl<V: Send + 'static> AsyncWait<V> {
1890 pub fn spawn<RT, O, F>(&self,
1892 rt: RT,
1893 timeout: Option<usize>,
1894 future: F) -> Result<()>
1895 where RT: AsyncRuntime<O>,
1896 O: Default + 'static,
1897 F: Future<Output = Result<V>> + Send + 'static {
1898 self.0.spawn(rt.clone(), future)?;
1899
1900 if let Some(timeout) = timeout {
1901 let rt_copy = rt.clone();
1903 self.0.spawn(rt, async move {
1904 rt_copy.timeout(timeout).await;
1905
1906 Err(Error::new(ErrorKind::TimedOut, format!("Time out")))
1908 })
1909 } else {
1910 Ok(())
1912 }
1913 }
1914
1915 pub fn spawn_local<O, F>(&self,
1917 timeout: Option<usize>,
1918 future: F) -> Result<()>
1919 where O: Default + 'static,
1920 F: Future<Output = Result<V>> + Send + 'static {
1921 if let Some(rt) = local_async_runtime::<O>() {
1922 self.0.spawn_local(future)?;
1924
1925 if let Some(timeout) = timeout {
1926 let rt_copy = rt.clone();
1928 self.0.spawn_local(async move {
1929 rt_copy.timeout(timeout).await;
1930
1931 Err(Error::new(ErrorKind::TimedOut, format!("Time out")))
1933 })
1934 } else {
1935 Ok(())
1937 }
1938 } else {
1939 Err(Error::new(ErrorKind::Other, format!("Spawn wait task failed, reason: local async runtime not exist")))
1941 }
1942 }
1943}
1944
1945impl<V: Send + 'static> AsyncWait<V> {
1949 pub async fn wait_result(self) -> Result<V> {
1951 self.0.wait_result().await
1952 }
1953}
1954
1955pub struct AsyncWaitAny<V: Send + 'static> {
1959 capacity: usize, producor: AsyncSender<Result<V>>, consumer: AsyncReceiver<Result<V>>, }
1963
1964unsafe impl<V: Send + 'static> Send for AsyncWaitAny<V> {}
1965unsafe impl<V: Send + 'static> Sync for AsyncWaitAny<V> {}
1966
1967impl<V: Send + 'static> AsyncWaitAny<V> {
1971 pub fn spawn<RT, O, F>(&self,
1973 rt: RT,
1974 future: F) -> Result<()>
1975 where RT: AsyncRuntime<O>,
1976 O: Default + 'static,
1977 F: Future<Output = Result<V>> + Send + 'static {
1978 let producor = self.producor.clone();
1979 rt.spawn_by_id(rt.alloc::<O>(), async move {
1980 let value = future.await;
1981 producor.into_send_async(value).await;
1982
1983 Default::default()
1985 })
1986 }
1987
1988 pub fn spawn_local<F>(&self,
1990 future: F) -> Result<()>
1991 where F: Future<Output = Result<V>> + Send + 'static {
1992 if let Some(rt) = local_async_runtime() {
1993 let producor = self.producor.clone();
1995 rt.spawn(async move {
1996 let value = future.await;
1997 producor.into_send_async(value).await;
1998 })
1999 } else {
2000 Err(Error::new(ErrorKind::Other, format!("Spawn wait any task failed, reason: local async runtime not exist")))
2002 }
2003 }
2004}
2005
2006impl<V: Send + 'static> AsyncWaitAny<V> {
2010 pub async fn wait_result(self) -> Result<V> {
2012 match self.consumer.recv_async().await {
2013 Err(e) => {
2014 Err(Error::new(ErrorKind::Other, format!("Wait any result failed, reason: {:?}", e)))
2016 },
2017 Ok(result) => {
2018 result
2020 },
2021 }
2022 }
2023}
2024
2025pub struct AsyncWaitAnyCallback<V: Send + 'static> {
2029 capacity: usize, producor: AsyncSender<Result<V>>, consumer: AsyncReceiver<Result<V>>, }
2033
2034unsafe impl<V: Send + 'static> Send for AsyncWaitAnyCallback<V> {}
2035unsafe impl<V: Send + 'static> Sync for AsyncWaitAnyCallback<V> {}
2036
2037impl<V: Send + 'static> AsyncWaitAnyCallback<V> {
2041 pub fn spawn<RT, O, F>(&self,
2043 rt: RT,
2044 future: F) -> Result<()>
2045 where RT: AsyncRuntime<O>,
2046 O: Default + 'static,
2047 F: Future<Output = Result<V>> + Send + 'static {
2048 let producor = self.producor.clone();
2049 rt.spawn_by_id(rt.alloc::<O>(), async move {
2050 let value = future.await;
2051 producor.into_send_async(value).await;
2052
2053 Default::default()
2055 })
2056 }
2057
2058 pub fn spawn_local<F>(&self,
2060 future: F) -> Result<()>
2061 where F: Future<Output = Result<V>> + Send + 'static {
2062 if let Some(rt) = local_async_runtime() {
2063 let producor = self.producor.clone();
2065 rt.spawn(async move {
2066 let value = future.await;
2067 producor.into_send_async(value).await;
2068 })
2069 } else {
2070 Err(Error::new(ErrorKind::Other, format!("Spawn wait any task failed by callback, reason: current async runtime not exist")))
2072 }
2073 }
2074}
2075
2076impl<V: Send + 'static> AsyncWaitAnyCallback<V> {
2080 pub async fn wait_result(mut self,
2082 callback: impl Fn(&Result<V>) -> bool + Send + Sync + 'static) -> Result<V> {
2083 let checker = create_checker(self.capacity, callback);
2084 loop {
2085 match self.consumer.recv_async().await {
2086 Err(e) => {
2087 return Err(Error::new(ErrorKind::Other, format!("Wait any result failed by callback, reason: {:?}", e)));
2089 },
2090 Ok(result) => {
2091 if checker(&result) {
2093 return result;
2095 }
2096 },
2097 }
2098 }
2099 }
2100}
2101
2102fn create_checker<V, F>(len: usize,
2104 callback: F) -> Arc<dyn Fn(&Result<V>) -> bool + Send + Sync + 'static>
2105 where V: Send + 'static,
2106 F: Fn(&Result<V>) -> bool + Send + Sync + 'static {
2107 let mut check_counter = AtomicUsize::new(len); Arc::new(move |result| {
2109 if check_counter.fetch_sub(1, Ordering::SeqCst) == 1 {
2110 true
2112 } else {
2113 callback(result)
2115 }
2116 })
2117}
2118
2119pub struct AsyncMapReduce<V: Send + 'static> {
2123 count: usize, capacity: usize, producor: AsyncSender<(usize, Result<V>)>, consumer: AsyncReceiver<(usize, Result<V>)>, }
2128
2129unsafe impl<V: Send + 'static> Send for AsyncMapReduce<V> {}
2130
2131impl<V: Send + 'static> AsyncMapReduce<V> {
2135 pub fn map<RT, O, F>(&mut self, rt: RT, future: F) -> Result<usize>
2137 where RT: AsyncRuntime<O>,
2138 O: Default + 'static,
2139 F: Future<Output = Result<V>> + Send + 'static {
2140 if self.count >= self.capacity {
2141 return Err(Error::new(ErrorKind::Other, format!("Map task to runtime failed, capacity: {}, reason: out of capacity", self.capacity)));
2143 }
2144
2145 let index = self.count;
2146 let producor = self.producor.clone();
2147 rt.spawn_by_id(rt.alloc::<O>(), async move {
2148 let value = future.await;
2149 producor.into_send_async((index, value)).await;
2150
2151 Default::default()
2153 })?;
2154
2155 self.count += 1; Ok(index)
2157 }
2158}
2159
2160impl<V: Send + 'static> AsyncMapReduce<V> {
2164 pub async fn reduce(self, order: bool) -> Result<Vec<Result<V>>> {
2166 let mut count = self.count;
2167 let mut results = Vec::with_capacity(count);
2168 while count > 0 {
2169 match self.consumer.recv_async().await {
2170 Err(e) => {
2171 return Err(Error::new(ErrorKind::Other, format!("Reduce result failed, reason: {:?}", e)));
2173 },
2174 Ok((index, result)) => {
2175 results.push((index, result));
2177 count -= 1;
2178 },
2179 }
2180 }
2181
2182 if order {
2183 results.sort_by_key(|(key, _value)| {
2185 key.clone()
2186 });
2187 }
2188 let (_, values) = results
2189 .into_iter()
2190 .unzip::<usize, Result<V>, Vec<usize>, Vec<Result<V>>>();
2191
2192 Ok(values)
2193 }
2194}
2195
2196pub enum AsyncPipelineResult<O: 'static> {
2200 Disconnect, Filtered(O), }
2203
2204pub fn spawn_worker_thread<F0, F1>(thread_name: &str,
2210 thread_stack_size: usize,
2211 thread_handler: Arc<AtomicBool>,
2212 thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>, sleep_timeout: u64, loop_interval: Option<u64>, loop_func: F0,
2216 get_queue_len: F1) -> Arc<AtomicBool>
2217 where F0: Fn() -> (bool, Duration) + Send + 'static,
2218 F1: Fn() -> usize + Send + 'static {
2219 let thread_status_copy = thread_handler.clone();
2220
2221 thread::Builder::new()
2222 .name(thread_name.to_string())
2223 .stack_size(thread_stack_size).spawn(move || {
2224 let mut sleep_count = 0;
2225
2226 while thread_handler.load(Ordering::Relaxed) {
2227 let (is_no_task, run_time) = loop_func();
2228
2229 if is_no_task {
2230 if sleep_count > 1 {
2232 sleep_count = 0; let (is_sleep, lock, condvar) = &*thread_waker;
2235 let mut locked = lock.lock();
2236 if get_queue_len() > 0 {
2237 continue;
2239 }
2240
2241 if !is_sleep.load(Ordering::Relaxed) {
2242 is_sleep.store(true, Ordering::SeqCst);
2244 if condvar
2245 .wait_for(
2246 &mut locked,
2247 Duration::from_millis(sleep_timeout),
2248 )
2249 .timed_out()
2250 {
2251 is_sleep.store(false, Ordering::SeqCst);
2253 }
2254 }
2255
2256 continue; }
2258
2259 sleep_count += 1; if let Some(interval) = &loop_interval {
2261 if let Some(remaining_interval) = Duration::from_millis(*interval).checked_sub(run_time){
2263 thread::sleep(remaining_interval);
2265 }
2266 }
2267 } else {
2268 sleep_count = 0; if let Some(interval) = &loop_interval {
2271 if let Some(remaining_interval) = Duration::from_millis(*interval).checked_sub(run_time){
2273 thread::sleep(remaining_interval);
2275 }
2276 }
2277 }
2278 }
2279 });
2280
2281 thread_status_copy
2282}
2283
2284pub fn wakeup_worker_thread<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(worker_waker: &Arc<(AtomicBool, Mutex<()>, Condvar)>, rt: &SingleTaskRuntime<O, P>) {
2286 if worker_waker.0.load(Ordering::Relaxed) && rt.len() > 0 {
2288 let (is_sleep, lock, condvar) = &**worker_waker;
2289 let _locked = lock.lock();
2290 is_sleep.store(false, Ordering::SeqCst); let _ = condvar.notify_one();
2292 }
2293}
2294
2295pub fn register_global_panic_handler<Handler>(handler: Handler)
2297 where Handler: Fn(thread::Thread, String, Option<String>, Option<(String, u32, u32)>) -> Option<i32> + Send + Sync + 'static {
2298 set_hook(Box::new(move |panic_info| {
2299 let thread_info = thread::current();
2300
2301 let payload = panic_info.payload();
2302 let payload_info = match payload.downcast_ref::<&str>() {
2303 None => {
2304 match payload.downcast_ref::<String>() {
2306 None => {
2307 "Unknow panic".to_string()
2309 },
2310 Some(info) => {
2311 info.clone()
2312 }
2313 }
2314 },
2315 Some(info) => {
2316 info.to_string()
2317 }
2318 };
2319
2320 let other_info = if let Some(arg) = panic_info.payload_as_str() {
2321 Some(arg.to_string())
2322 } else {
2323 None
2324 };
2325
2326 let location = if let Some(location) = panic_info.location() {
2327 Some((location.file().to_string(), location.line(), location.column()))
2328 } else {
2329 None
2330 };
2331
2332 if let Some(exit_code) = handler(thread_info, payload_info, other_info, location) {
2333 std::process::exit(exit_code);
2335 }
2336 }));
2337}
2338
2339pub fn replace_global_alloc_error_handler() {
2341 set_alloc_error_hook(global_alloc_error_handle);
2342}
2343
2344fn global_alloc_error_handle(layout: Layout) {
2345 let bt = Backtrace::new();
2346 eprintln!("[UTC: {}][Thread: {}]Global memory allocation of {:?} bytes failed, stacktrace: \n{:?}",
2347 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis(),
2348 thread::current().name().unwrap_or(""),
2349 layout.size(),
2350 bt);
2351}
2352
2353pub(crate) struct YieldNow(bool);
2355
2356impl Future for YieldNow {
2357 type Output = ();
2358
2359 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2360 if self.0 {
2361 Poll::Ready(())
2362 } else {
2363 self.0 = true;
2364 cx.waker().wake_by_ref();
2365 Poll::Pending
2366 }
2367 }
2368}