1use std::sync::Arc;
31use std::vec::IntoIter;
32use std::time::Duration;
33use std::future::Future;
34use std::cell::UnsafeCell;
35use std::marker::PhantomData;
36use std::io::{Error, ErrorKind, Result};
37use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
38use std::task::{Context, Poll, Waker};
39use std::thread::{self, Builder};
40
41use async_stream::stream;
42use crossbeam_channel::{bounded, Sender};
43use crossbeam_deque::{Injector, Steal, Stealer, Worker};
44use crossbeam_queue::{ArrayQueue, SegQueue};
45use st3::{StealError,
46 fifo::{Worker as FIFOWorker, Stealer as FIFOStealer}};
47use flume::bounded as async_bounded;
48use futures::{
49 future::{BoxFuture, FutureExt},
50 stream::{BoxStream, Stream, StreamExt},
51 task::waker_ref,
52 TryFuture,
53};
54use parking_lot::{Condvar, Mutex};
55use rand::{Rng, thread_rng};
56use num_cpus;
57use wrr::IWRRSelector;
58use quanta::{Clock, Instant as QInstant};
59use log::warn;
60
61use super::{
62 PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME, PI_ASYNC_THREAD_LOCAL_ID, DEFAULT_MAX_HIGH_PRIORITY_BOUNDED, DEFAULT_HIGH_PRIORITY_BOUNDED, DEFAULT_MAX_LOW_PRIORITY_BOUNDED, alloc_rt_uid, local_async_runtime, AsyncMapReduce, AsyncPipelineResult, AsyncRuntime,
63 AsyncRuntimeExt, AsyncTask, AsyncTaskPool, AsyncTaskPoolExt, AsyncTaskTimerByNotCancel, AsyncTimingTask,
64 AsyncWait, AsyncWaitAny, AsyncWaitAnyCallback, AsyncWaitTimeout, LocalAsyncWaitTimeout, LocalAsyncRuntime, TaskId, TaskHandle, YieldNow
65};
66
67#[cfg(not(target_arch = "wasm32"))]
71const DEFAULT_INIT_WORKER_SIZE: usize = 2;
72#[cfg(target_arch = "wasm32")]
73const DEFAULT_INIT_WORKER_SIZE: usize = 1;
74
75const DEFAULT_WORKER_THREAD_PREFIX: &str = "Default-Multi-RT";
79
80const DEFAULT_THREAD_STACK_SIZE: usize = 1024 * 1024;
84
85const DEFAULT_WORKER_THREAD_SLEEP_TIME: u64 = 10;
89
90const DEFAULT_RUNTIME_SLEEP_TIME: u64 = 1000;
94
95const DEFAULT_MAX_WEIGHT: u8 = 254;
99
100const DEFAULT_MIN_WEIGHT: u8 = 1;
104
105struct ComputationalTaskQueue<O: Default + 'static> {
109 stack: Worker<Arc<AsyncTask<ComputationalTaskPool<O>, O>>>, queue: SegQueue<Arc<AsyncTask<ComputationalTaskPool<O>, O>>>, thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>, }
113
114impl<O: Default + 'static> ComputationalTaskQueue<O> {
115 pub fn new(thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) -> Self {
117 let stack = Worker::new_lifo();
118 let queue = SegQueue::new();
119
120 ComputationalTaskQueue {
121 stack,
122 queue,
123 thread_waker,
124 }
125 }
126
127 pub fn len(&self) -> usize {
129 self.stack.len() + self.queue.len()
130 }
131}
132
133pub struct ComputationalTaskPool<O: Default + 'static> {
137 workers: Vec<ComputationalTaskQueue<O>>, waits: Option<Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>>, consume_count: Arc<AtomicUsize>, produce_count: Arc<AtomicUsize>, }
142
143unsafe impl<O: Default + 'static> Send for ComputationalTaskPool<O> {}
144unsafe impl<O: Default + 'static> Sync for ComputationalTaskPool<O> {}
145
146impl<O: Default + 'static> Default for ComputationalTaskPool<O> {
147 fn default() -> Self {
148 #[cfg(not(target_arch = "wasm32"))]
149 let core_len = num_cpus::get(); #[cfg(target_arch = "wasm32")]
151 let core_len = 1; ComputationalTaskPool::new(core_len)
153 }
154}
155
156impl<O: Default + 'static> AsyncTaskPool<O> for ComputationalTaskPool<O> {
157 type Pool = ComputationalTaskPool<O>;
158
159 #[inline]
160 fn get_thread_id(&self) -> usize {
161 match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| unsafe { *thread_id.get() }) {
162 Err(e) => {
163 panic!(
165 "Get thread id failed, thread: {:?}, reason: {:?}",
166 thread::current(),
167 e
168 );
169 }
170 Ok(id) => id,
171 }
172 }
173
174 #[inline]
175 fn len(&self) -> usize {
176 if let Some(len) = self
177 .produce_count
178 .load(Ordering::Relaxed)
179 .checked_sub(self.consume_count.load(Ordering::Relaxed))
180 {
181 len
182 } else {
183 0
184 }
185 }
186
187 #[inline]
188 fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
189 let index = self.produce_count.fetch_add(1, Ordering::Relaxed) % self.workers.len();
190 self.workers[index].queue.push(task);
191 Ok(())
192 }
193
194 #[inline]
195 fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
196 let id = self.get_thread_id();
197 let rt_uid = task.owner();
198 if (id >> 32) == rt_uid {
199 let worker = &self.workers[id & 0xffffffff];
201 worker.queue.push(task);
202
203 self.produce_count.fetch_add(1, Ordering::Relaxed);
204 Ok(())
205 } else {
206 self.push(task)
208 }
209 }
210
211 #[inline]
212 fn push_priority(&self,
213 priority: usize,
214 task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
215 if priority >= DEFAULT_MAX_HIGH_PRIORITY_BOUNDED {
216 let id = self.get_thread_id();
218 let rt_uid = task.owner();
219 if (id >> 32) == rt_uid {
220 let worker = &self.workers[id & 0xffffffff];
221 worker.stack.push(task);
222
223 self.produce_count.fetch_add(1, Ordering::Relaxed);
224 Ok(())
225 } else {
226 self.push(task)
227 }
228 } else if priority >= DEFAULT_HIGH_PRIORITY_BOUNDED {
229 self.push_local(task)
231 } else {
232 self.push(task)
234 }
235 }
236
237 #[inline]
238 fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
239 self.push_priority(DEFAULT_HIGH_PRIORITY_BOUNDED, task)
240 }
241
242 #[inline]
243 fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
244 let id = self.get_thread_id() & 0xffffffff;
245 let worker = &self.workers[id];
246 let task = worker.stack.pop();
247 if task.is_some() {
248 self.consume_count.fetch_add(1, Ordering::Relaxed);
250 return task;
251 }
252
253 let task = worker.queue.pop();
254 if task.is_some() {
255 self.consume_count.fetch_add(1, Ordering::Relaxed);
256 }
257
258 task
259 }
260
261 #[inline]
262 fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
263 let mut tasks = Vec::with_capacity(self.len());
264 while let Some(task) = self.try_pop() {
265 tasks.push(task);
266 }
267
268 tasks.into_iter()
269 }
270
271 #[inline]
272 fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
273 None
275 }
276}
277
278impl<O: Default + 'static> AsyncTaskPoolExt<O> for ComputationalTaskPool<O> {
279 #[inline]
280 fn set_waits(&mut self, waits: Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>) {
281 self.waits = Some(waits);
282 }
283
284 #[inline]
285 fn get_waits(&self) -> Option<&Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>> {
286 self.waits.as_ref()
287 }
288
289 #[inline]
290 fn worker_len(&self) -> usize {
291 self.workers.len()
292 }
293
294 #[inline]
295 fn clone_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
296 let worker = &self.workers[self.get_thread_id() & 0xffffffff];
297 Some(worker.thread_waker.clone())
298 }
299}
300
301impl<O: Default + 'static> ComputationalTaskPool<O> {
302 pub fn new(mut size: usize) -> Self {
304 if size < DEFAULT_INIT_WORKER_SIZE {
305 size = DEFAULT_INIT_WORKER_SIZE;
307 }
308
309 let mut workers = Vec::with_capacity(size);
310 for _ in 0..size {
311 let thread_waker = Arc::new((AtomicBool::new(false), Mutex::new(()), Condvar::new()));
312 let worker = ComputationalTaskQueue::new(thread_waker);
313 workers.push(worker);
314 }
315 let consume_count = Arc::new(AtomicUsize::new(0));
316 let produce_count = Arc::new(AtomicUsize::new(0));
317
318 ComputationalTaskPool {
319 workers,
320 waits: None,
321 consume_count,
322 produce_count,
323 }
324 }
325}
326
327struct StealableTaskQueue<O: Default + 'static> {
331 stack: UnsafeCell<Option<Arc<AsyncTask<StealableTaskPool<O>, O>>>>, internal: FIFOWorker<Arc<AsyncTask<StealableTaskPool<O>, O>>>, external: Worker<Arc<AsyncTask<StealableTaskPool<O>, O>>>, selector: UnsafeCell<IWRRSelector<2>>, thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>, }
337
338impl<O: Default + 'static> StealableTaskQueue<O> {
339 pub fn new(
342 init_queue_capacity: usize,
343 thread_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>,
344 ) -> (Self,
345 FIFOStealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>,
346 Stealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>) {
347 let stack = UnsafeCell::new(None);
348 let internal = FIFOWorker::new(init_queue_capacity);
349 let external = Worker::new_fifo();
350 let internal_stealer = internal.stealer();
351 let external_stealer = external.stealer();
352 let selector = UnsafeCell::new(IWRRSelector::new([2, 1]));
353
354 (
355 StealableTaskQueue {
356 stack,
357 internal,
358 external,
359 selector,
360 thread_waker,
361 },
362 internal_stealer,
363 external_stealer
364 )
365 }
366
367 pub const fn stack_capacity(&self) -> usize {
369 1
370 }
371
372 pub fn internal_capacity(&self) -> usize {
374 self.internal.capacity()
375 }
376
377 pub fn remaining_internal_capacity(&self) -> usize {
379 self.internal.spare_capacity()
380 }
381
382 #[inline]
384 pub fn stack_len(&self) -> usize {
385 unsafe {
386 if (&*self.stack.get()).is_some() {
387 1
388 } else {
389 0
390 }
391 }
392 }
393
394 pub fn internal_len(&self) -> usize {
396 self
397 .internal_capacity()
398 .checked_sub(self.remaining_internal_capacity())
399 .unwrap_or(0)
400 }
401
402 pub fn external_len(&self) -> usize {
404 self.external.len()
405 }
406}
407
408pub struct StealableTaskPool<O: Default + 'static> {
412 public: Injector<Arc<AsyncTask<StealableTaskPool<O>, O>>>, workers: Vec<StealableTaskQueue<O>>, internal_stealers: Vec<FIFOStealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>>, external_stealers: Vec<Stealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>>, internal_consume: AtomicUsize, internal_produce: AtomicUsize, internal_traffic_statistics: AtomicUsize, external_consume: AtomicUsize, external_produce: AtomicUsize, external_traffic_statistics: AtomicUsize, weights: [u8; 2], clock: Clock, interval: usize, last_time: UnsafeCell<QInstant>, }
427
428unsafe impl<O: Default + 'static> Send for StealableTaskPool<O> {}
429unsafe impl<O: Default + 'static> Sync for StealableTaskPool<O> {}
430
431impl<O: Default + 'static> Default for StealableTaskPool<O> {
432 fn default() -> Self {
433 StealableTaskPool::new()
434 }
435}
436
437impl<O: Default + 'static> AsyncTaskPool<O> for StealableTaskPool<O> {
438 type Pool = StealableTaskPool<O>;
439
440 #[inline]
441 fn get_thread_id(&self) -> usize {
442 match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| unsafe { *thread_id.get() }) {
443 Err(e) => {
444 panic!(
446 "Get thread id failed, thread: {:?}, reason: {:?}",
447 thread::current(),
448 e
449 );
450 }
451 Ok(id) => id,
452 }
453 }
454
455 #[inline]
456 fn len(&self) -> usize {
457 self.internal_produce
458 .load(Ordering::Relaxed)
459 .checked_sub(self.internal_consume.load(Ordering::Relaxed))
460 .unwrap_or(0)
461 +
462 self.external_produce
463 .load(Ordering::Relaxed)
464 .checked_sub(self.external_consume.load(Ordering::Relaxed))
465 .unwrap_or(0)
466 }
467
468 #[inline]
469 fn push(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
470 self.public.push(task);
471
472 self
473 .external_produce
474 .fetch_add(1, Ordering::Relaxed);
475 Ok(())
476 }
477
478 #[inline]
479 fn push_local(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
480 let id = self.get_thread_id();
481 let rt_uid = task.owner();
482 if (id >> 32) == rt_uid {
483 let worker = &self.workers[id & 0xffffffff];
485 if worker.remaining_internal_capacity() > 0 {
486 let _ = worker.internal.push(task);
488
489 self
490 .internal_produce
491 .fetch_add(1, Ordering::Relaxed);
492 Ok(())
493 } else {
494 self.push(task)
496 }
497 } else {
498 self.push(task)
500 }
501 }
502
503 #[inline]
504 fn push_priority(&self,
505 priority: usize,
506 task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
507 if priority >= DEFAULT_MAX_HIGH_PRIORITY_BOUNDED {
508 let id = self.get_thread_id();
510 let rt_uid = task.owner();
511 if (id >> 32) == rt_uid {
512 let worker = &self.workers[id & 0xffffffff];
514 if worker.stack_len() < 1 {
515 unsafe {
517 *worker.stack.get() = Some(task);
518 }
519 } else if worker.remaining_internal_capacity() > 0 {
520 let _ = worker.internal.push(task);
522 } else {
523 return self.push(task);
525 }
526
527 self
528 .internal_produce
529 .fetch_add(1, Ordering::Relaxed);
530 Ok(())
531 } else {
532 self.push(task)
534 }
535 } else if priority >= DEFAULT_HIGH_PRIORITY_BOUNDED {
536 self.push_local(task)
538 } else {
539 self.push(task)
541 }
542 }
543
544 #[inline]
545 fn push_keep(&self, task: Arc<AsyncTask<Self::Pool, O>>) -> Result<()> {
546 self.push_priority(DEFAULT_HIGH_PRIORITY_BOUNDED, task)
547 }
548
549 #[inline]
550 fn try_pop(&self) -> Option<Arc<AsyncTask<Self::Pool, O>>> {
551 let id = self.get_thread_id() & 0xffffffff;
552 let worker = &self.workers[id];
553 let task = unsafe { (&mut *worker
554 .stack
555 .get())
556 .take()
557 };
558 if task.is_some() {
559 return task;
561 }
562
563 try_pop_by_weight(self, worker, id)
565 }
566
567 #[inline]
568 fn try_pop_all(&self) -> IntoIter<Arc<AsyncTask<Self::Pool, O>>> {
569 let mut tasks = Vec::with_capacity(self.len());
570 while let Some(task) = self.try_pop() {
571 tasks.push(task);
572 }
573
574 tasks.into_iter()
575 }
576
577 #[inline]
578 fn get_thread_waker(&self) -> Option<&Arc<(AtomicBool, Mutex<()>, Condvar)>> {
579 None
581 }
582}
583
584const fn get_msb(n: usize) -> usize {
586 usize::BITS as usize - n.leading_zeros() as usize
587}
588
589fn try_pop_by_weight<O: Default + 'static>(pool: &StealableTaskPool<O>,
591 local_worker: &StealableTaskQueue<O>,
592 local_worker_id: usize)
593 -> Option<Arc<AsyncTask<StealableTaskPool<O>, O>>> {
594 unsafe {
595 let duration = pool
596 .clock
597 .recent()
598 .duration_since(*pool.last_time.get())
599 .as_millis() as usize;
600 if duration >= pool.interval {
601 let new_external_traffic_statistics = pool
603 .external_produce
604 .load(Ordering::Relaxed);
605 let new_internal_traffic_statistics = pool
606 .internal_produce
607 .load(Ordering::Relaxed);
608
609 let external_delta = if new_external_traffic_statistics == 0 {
611 1
613 } else {
614 new_external_traffic_statistics
616 .checked_sub(pool
617 .external_traffic_statistics
618 .load(Ordering::Relaxed))
619 .unwrap_or(1)
620 };
621 pool
622 .external_traffic_statistics
623 .store(new_external_traffic_statistics, Ordering::Relaxed); let internal_delta = if new_internal_traffic_statistics == 0 {
625 1
627 } else {
628 new_internal_traffic_statistics
630 .checked_sub(pool
631 .internal_traffic_statistics
632 .load(Ordering::Relaxed))
633 .unwrap_or(1)
634 };
635 pool
636 .internal_traffic_statistics
637 .store(new_internal_traffic_statistics, Ordering::Relaxed); let selector = &mut *local_worker.selector.get();
641 if external_delta > internal_delta {
642 let msb = get_msb(internal_delta);
644 let internal_weight
645 = (internal_delta >> msb.checked_sub(2).unwrap_or(0)).max(1);
646 let external_weight
647 = ((external_delta >> msb).min(DEFAULT_MAX_WEIGHT as usize)).max(1);
648
649 selector.change_weight(0, external_weight as u8);
650 selector.change_weight(1, internal_weight as u8);
651 } else if external_delta < internal_delta {
652 let msb = get_msb(external_delta);
654 let external_weight
655 = (external_delta >> msb.checked_sub(2).unwrap_or(0)).max(1);
656 let internal_weight
657 = ((internal_delta >> msb).min(DEFAULT_MAX_WEIGHT as usize)).max(1);
658
659 selector.change_weight(0, external_weight as u8);
660 selector.change_weight(1, internal_weight as u8);
661 } else {
662 selector.change_weight(0, 1);
664 selector.change_weight(1, 1);
665 }
666
667 *pool.last_time.get() = pool.clock.recent(); }
669
670 match (&mut *local_worker.selector.get()).select() {
672 0 => {
673 let task = try_pop_external(pool, local_worker, local_worker_id);
675 if task.is_some() {
676 task
677 } else {
678 try_pop_internal(pool, local_worker, local_worker_id)
680 }
681 },
682 _ => {
683 let task = try_pop_internal(pool, local_worker, local_worker_id);
685 if task.is_some() {
686 task
687 } else {
688 try_pop_external(pool, local_worker, local_worker_id)
690 }
691 },
692 }
693 }
694}
695
696#[inline]
698fn try_pop_internal<O: Default + 'static>(pool: &StealableTaskPool<O>,
699 local_worker: &StealableTaskQueue<O>,
700 local_worker_id: usize)
701 -> Option<Arc<AsyncTask<StealableTaskPool<O>, O>>> {
702 let task = local_worker
703 .internal
704 .pop();
705 if task.is_some() {
706 pool
708 .internal_consume
709 .fetch_add(1, Ordering::Relaxed);
710 task
711 } else {
712 let mut gen = thread_rng();
714 let mut worker_stealers: Vec<&FIFOStealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>> = pool
715 .internal_stealers
716 .iter()
717 .enumerate()
718 .filter_map(|(index, other)| {
719 if index != local_worker_id {
720 Some(other)
721 } else {
722 None
724 }
725 })
726 .collect();
727
728 let remaining_len = local_worker.remaining_internal_capacity();
729 loop {
730 if worker_stealers.len() == 0 {
732 break;
734 }
735
736 let index = gen.gen_range(0..worker_stealers.len());
737 let worker_stealer = worker_stealers.swap_remove(index);
738
739 match worker_stealer.steal_and_pop(&local_worker.internal,
740 |count| {
741 let stealable_len = count / 2;
742 if stealable_len <= remaining_len {
743 if stealable_len == 0 {
745 1
746 } else {
747 stealable_len
748 }
749 } else {
750 remaining_len
752 }
753 }) {
754 Err(StealError::Empty) => {
755 continue;
757 },
758 Err(StealError::Busy) => {
759 continue;
761 },
762 Ok((task, _)) => {
763 pool.internal_consume.fetch_add(1, Ordering::Relaxed);
765 return Some(task);
766 },
767 }
768 }
769
770 None
771 }
772}
773
774#[inline]
776fn try_pop_external<O: Default + 'static>(pool: &StealableTaskPool<O>,
777 local_worker: &StealableTaskQueue<O>,
778 local_worker_id: usize)
779 -> Option<Arc<AsyncTask<StealableTaskPool<O>, O>>> {
780 let task = local_worker
781 .external
782 .pop();
783 if task.is_some() {
784 pool
786 .external_consume
787 .fetch_add(1, Ordering::Relaxed);
788 task
789 } else {
790 let task = try_pop_public(pool, local_worker);
792 if task.is_some() {
793 pool
795 .external_consume
796 .fetch_add(1, Ordering::Relaxed);
797 task
798 } else {
799 let mut gen = thread_rng();
801 let mut worker_stealers: Vec<&Stealer<Arc<AsyncTask<StealableTaskPool<O>, O>>>> = pool
802 .external_stealers
803 .iter()
804 .enumerate()
805 .filter_map(|(index, other)| {
806 if index != local_worker_id {
807 Some(other)
808 } else {
809 None
811 }
812 })
813 .collect();
814
815 loop {
816 if worker_stealers.len() == 0 {
818 break;
820 }
821
822 let index = gen.gen_range(0..worker_stealers.len());
823 let worker_stealer = worker_stealers.swap_remove(index);
824
825 match worker_stealer.steal_batch_and_pop(&local_worker.external) {
826 Steal::Success(task) => {
827 pool.external_consume.fetch_add(1, Ordering::Relaxed);
829 return Some(task);
830 },
831 Steal::Retry => {
832 continue;
834 },
835 Steal::Empty => {
836 continue;
838 },
839 }
840 }
841
842 None
843 }
844 }
845}
846
847#[inline]
849fn try_pop_public<O: Default + 'static>(pool: &StealableTaskPool<O>,
850 local_worker: &StealableTaskQueue<O>)
851 -> Option<Arc<AsyncTask<StealableTaskPool<O>, O>>> {
852 loop {
853 match pool.public.steal_batch_and_pop(&local_worker.external) {
854 Steal::Empty => {
855 return None;
857 },
858 Steal::Retry => {
859 continue;
861 },
862 Steal::Success(task) => {
863 pool.external_consume.fetch_add(1, Ordering::Relaxed);
865 return Some(task);
866 },
867 }
868 }
869}
870
871impl<O: Default + 'static> AsyncTaskPoolExt<O> for StealableTaskPool<O> {
872 #[inline]
873 fn worker_len(&self) -> usize {
874 self.workers.len()
875 }
876
877 #[inline]
878 fn clone_thread_waker(&self) -> Option<Arc<(AtomicBool, Mutex<()>, Condvar)>> {
879 if let Some(worker) = self.workers.get(self.get_thread_id() & 0xffffffff) {
880 return Some(worker.thread_waker.clone());
881 }
882
883 None
884 }
885}
886
887impl<O: Default + 'static> StealableTaskPool<O> {
888 pub fn new() -> Self {
890 #[cfg(not(target_arch = "wasm32"))]
891 let size = num_cpus::get_physical() * 2; #[cfg(target_arch = "wasm32")]
893 let size = 1; StealableTaskPool::with(size,
895 0x8000,
896 [1, 1],
897 3000)
898 }
899
900 pub fn with(worker_size: usize,
902 internal_queue_capacity: usize,
903 weights: [u8; 2],
904 interval: usize) -> Self {
905 if worker_size == 0 {
906 panic!(
908 "Create WorkerTaskPool failed, worker size: {}, reason: invalid worker size",
909 worker_size
910 );
911 }
912 if interval == 0 {
913 panic!(
914 "Create WorkerTaskPool failed, interval: {}, reason: invalid interval",
915 worker_size
916 );
917 }
918
919 let public = Injector::new();
920 let mut workers = Vec::with_capacity(worker_size);
921 let mut internal_stealers = Vec::with_capacity(worker_size);
922 let mut external_stealers = Vec::with_capacity(worker_size);
923 for _ in 0..worker_size {
924 let thread_waker = Arc::new((AtomicBool::new(false), Mutex::new(()), Condvar::new()));
926 let (worker,
927 internal_stealer,
928 external_stealer) =
929 StealableTaskQueue::new(internal_queue_capacity,
930 thread_waker);
931 workers.push(worker);
932 internal_stealers.push(internal_stealer);
933 external_stealers.push(external_stealer);
934 }
935 let internal_consume = AtomicUsize::new(0);
936 let internal_produce = AtomicUsize::new(0);
937 let internal_traffic_statistics = AtomicUsize::new(0);
938 let external_consume = AtomicUsize::new(0);
939 let external_produce = AtomicUsize::new(0);
940 let external_traffic_statistics = AtomicUsize::new(0);
941 let clock = Clock::new();
942 let last_time = UnsafeCell::new(clock.recent());
943
944 StealableTaskPool {
945 public,
946 workers,
947 internal_stealers,
948 external_stealers,
949 internal_consume,
950 internal_produce,
951 internal_traffic_statistics,
952 external_consume,
953 external_produce,
954 external_traffic_statistics,
955 weights,
956 clock,
957 interval,
958 last_time,
959 }
960 }
961}
962
963pub struct MultiTaskRuntime<
967 O: Default + 'static = (),
968 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = StealableTaskPool<O>,
969>(
970 Arc<(
971 usize, Arc<P>, Option<
974 Vec<(
975 Sender<(usize, AsyncTimingTask<P, O>)>,
976 Arc<AsyncTaskTimerByNotCancel<P, O>>,
977 )>,
978 >, AtomicUsize, Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>, AtomicUsize, AtomicUsize, )>,
984);
985
986unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Send
987 for MultiTaskRuntime<O, P>
988{
989}
990unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Sync
991 for MultiTaskRuntime<O, P>
992{
993}
994
995impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Clone
996 for MultiTaskRuntime<O, P>
997{
998 fn clone(&self) -> Self {
999 MultiTaskRuntime(self.0.clone())
1000 }
1001}
1002
1003impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>> AsyncRuntime<O>
1004 for MultiTaskRuntime<O, P>
1005{
1006 type Pool = P;
1007
1008 fn shared_pool(&self) -> Arc<Self::Pool> {
1010 (self.0).1.clone()
1011 }
1012
1013 fn get_id(&self) -> usize {
1015 (self.0).0
1016 }
1017
1018 fn wait_len(&self) -> usize {
1020 (self.0)
1021 .5
1022 .load(Ordering::Relaxed)
1023 .checked_sub((self.0).6.load(Ordering::Relaxed))
1024 .unwrap_or(0)
1025 }
1026
1027 fn len(&self) -> usize {
1029 (self.0).1.len()
1030 }
1031
1032 fn alloc<R: 'static>(&self) -> TaskId {
1034 TaskId(UnsafeCell::new((TaskHandle::<R>::default().into_raw() as u128) << 64 | self.get_id() as u128 & 0xffffffffffffffff))
1035 }
1036
1037 fn spawn<F>(&self, future: F) -> Result<TaskId>
1039 where
1040 F: Future<Output = O> + Send + 'static,
1041 {
1042 let task_id = self.alloc::<F::Output>();
1043 if let Err(e) = self.spawn_by_id(task_id.clone(), future) {
1044 return Err(e);
1045 }
1046
1047 Ok(task_id)
1048 }
1049
1050 fn spawn_local<F>(&self, future: F) -> Result<TaskId>
1052 where
1053 F: Future<Output=O> + Send + 'static {
1054 let task_id = self.alloc::<F::Output>();
1055 if let Err(e) = self.spawn_local_by_id(task_id.clone(), future) {
1056 return Err(e);
1057 }
1058
1059 Ok(task_id)
1060 }
1061
1062 fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
1064 where
1065 F: Future<Output=O> + Send + 'static {
1066 let task_id = self.alloc::<F::Output>();
1067 if let Err(e) = self.spawn_priority_by_id(task_id.clone(), priority, future) {
1068 return Err(e);
1069 }
1070
1071 Ok(task_id)
1072 }
1073
1074 fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
1076 where
1077 F: Future<Output=O> + Send + 'static {
1078 let task_id = self.alloc::<F::Output>();
1079 if let Err(e) = self.spawn_yield_by_id(task_id.clone(), future) {
1080 return Err(e);
1081 }
1082
1083 Ok(task_id)
1084 }
1085
1086 fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
1088 where
1089 F: Future<Output = O> + Send + 'static,
1090 {
1091 let task_id = self.alloc::<F::Output>();
1092 if let Err(e) = self.spawn_timing_by_id(task_id.clone(), future, time) {
1093 return Err(e);
1094 }
1095
1096 Ok(task_id)
1097 }
1098
1099 fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
1101 where
1102 F: Future<Output=O> + Send + 'static {
1103 let result = {
1104 (self.0).1.push(Arc::new(AsyncTask::new(
1105 task_id,
1106 (self.0).1.clone(),
1107 DEFAULT_MAX_LOW_PRIORITY_BOUNDED,
1108 Some(future.boxed()),
1109 )))
1110 };
1111
1112 if let Some(worker_waker) = (self.0).4.pop() {
1113 let (is_sleep, lock, condvar) = &*worker_waker;
1115 let _locked = lock.lock();
1116 if is_sleep.load(Ordering::Relaxed) {
1117 is_sleep.store(false, Ordering::SeqCst); condvar.notify_one();
1120 }
1121 }
1122
1123 result
1124 }
1125
1126 fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
1127 where
1128 F: Future<Output=O> + Send + 'static {
1129 (self.0).1.push_local(Arc::new(AsyncTask::new(
1130 task_id,
1131 (self.0).1.clone(),
1132 DEFAULT_HIGH_PRIORITY_BOUNDED,
1133 Some(future.boxed()),
1134 )))
1135 }
1136
1137 fn spawn_priority_by_id<F>(&self,
1139 task_id: TaskId,
1140 priority: usize,
1141 future: F) -> Result<()>
1142 where
1143 F: Future<Output=O> + Send + 'static {
1144 let result = {
1145 (self.0).1.push_priority(priority, Arc::new(AsyncTask::new(
1146 task_id,
1147 (self.0).1.clone(),
1148 priority,
1149 Some(future.boxed()),
1150 )))
1151 };
1152
1153 if let Some(worker_waker) = (self.0).4.pop() {
1154 let (is_sleep, lock, condvar) = &*worker_waker;
1156 let _locked = lock.lock();
1157 if is_sleep.load(Ordering::Relaxed) {
1158 is_sleep.store(false, Ordering::SeqCst); condvar.notify_one();
1161 }
1162 }
1163
1164 result
1165 }
1166
1167 #[inline]
1169 fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
1170 where
1171 F: Future<Output=O> + Send + 'static {
1172 self.spawn_priority_by_id(task_id,
1173 DEFAULT_HIGH_PRIORITY_BOUNDED,
1174 future)
1175 }
1176
1177 fn spawn_timing_by_id<F>(&self,
1179 task_id: TaskId,
1180 future: F,
1181 time: usize) -> Result<()>
1182 where
1183 F: Future<Output=O> + Send + 'static {
1184 let rt = self.clone();
1185 self.spawn_by_id(task_id, async move {
1186 if let Some(timers) = &(rt.0).2 {
1187 let id = (rt.0).1.get_thread_id() & 0xffffffff;
1189 let (_, timer) = &timers[id];
1190 timer.set_timer(
1191 AsyncTimingTask::WaitRun(Arc::new(AsyncTask::new(
1192 rt.alloc::<F::Output>(),
1193 (rt.0).1.clone(),
1194 DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
1195 Some(future.boxed()),
1196 ))),
1197 time,
1198 );
1199
1200 (rt.0).5.fetch_add(1, Ordering::Relaxed);
1201 }
1202
1203 Default::default()
1204 })
1205 }
1206
1207 fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
1209 task_id.set_waker::<Output>(waker);
1210 Poll::Pending
1211 }
1212
1213 fn wakeup<Output: 'static>(&self, task_id: &TaskId) {
1215 task_id.wakeup::<Output>();
1216 }
1217
1218 fn wait<V: Send + 'static>(&self) -> AsyncWait<V> {
1220 AsyncWait(self.wait_any(2))
1221 }
1222
1223 fn wait_any<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
1225 let (producor, consumer) = async_bounded(capacity);
1226
1227 AsyncWaitAny {
1228 capacity,
1229 producor,
1230 consumer,
1231 }
1232 }
1233
1234 fn wait_any_callback<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
1236 let (producor, consumer) = async_bounded(capacity);
1237
1238 AsyncWaitAnyCallback {
1239 capacity,
1240 producor,
1241 consumer,
1242 }
1243 }
1244
1245 fn map_reduce<V: Send + 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
1247 let (producor, consumer) = async_bounded(capacity);
1248
1249 AsyncMapReduce {
1250 count: 0,
1251 capacity,
1252 producor,
1253 consumer,
1254 }
1255 }
1256
1257 fn timeout(&self, timeout: usize) -> BoxFuture<'static, ()> {
1259 let rt = self.clone();
1260
1261 if let Some(timers) = &(self.0).2 {
1262 match PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| {
1264 let thread_id = unsafe { *thread_id.get() };
1266 let index = thread_id & 0xffffffff;
1267 if index > timers.len() {
1268 TimerTaskProducor::Foreign(timers[(self.0).3.load(Ordering::Relaxed) % timers.len()].0.clone())
1270 } else {
1271 TimerTaskProducor::Local(timers[index].1.clone())
1272 }
1273 }) {
1274 Err(_) => {
1275 panic!("Multi thread runtime timeout failed, reason: local thread id not match")
1276 }
1277 Ok(producor) => match producor {
1278 TimerTaskProducor::Local(timer) => {
1279 LocalAsyncWaitTimeout::new(rt, timer, timeout).boxed()
1280 },
1281 TimerTaskProducor::Foreign(producor) => {
1282 AsyncWaitTimeout::new(rt, producor, timeout).boxed()
1283 },
1284 },
1285 }
1286 } else {
1287 async move {
1289 thread::sleep(Duration::from_millis(timeout as u64));
1290 }
1291 .boxed()
1292 }
1293 }
1294
1295 fn yield_now(&self) -> BoxFuture<'static, ()> {
1297 async move {
1298 YieldNow(false).await;
1299 }.boxed()
1300 }
1301
1302 fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> BoxStream<'static, FO>
1304 where
1305 S: Stream<Item = SO> + Send + 'static,
1306 SO: Send + 'static,
1307 F: FnMut(SO) -> AsyncPipelineResult<FO> + Send + 'static,
1308 FO: Send + 'static,
1309 {
1310 let output = stream! {
1311 for await value in input {
1312 match filter(value) {
1313 AsyncPipelineResult::Disconnect => {
1314 break;
1316 },
1317 AsyncPipelineResult::Filtered(result) => {
1318 yield result;
1319 },
1320 }
1321 }
1322 };
1323
1324 output.boxed()
1325 }
1326
1327 fn close(&self) -> bool {
1329 false
1330 }
1331}
1332
1333impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>> AsyncRuntimeExt<O>
1334 for MultiTaskRuntime<O, P>
1335{
1336 fn spawn_with_context<F, C>(&self, task_id: TaskId, future: F, context: C) -> Result<()>
1337 where
1338 F: Future<Output = O> + Send + 'static,
1339 C: 'static,
1340 {
1341 let task = Arc::new(AsyncTask::with_context(
1342 task_id,
1343 (self.0).1.clone(),
1344 DEFAULT_MAX_LOW_PRIORITY_BOUNDED,
1345 Some(future.boxed()),
1346 context,
1347 ));
1348 let result = (self.0).1.push(task);
1349
1350 if let Some(worker_waker) = (self.0).4.pop() {
1351 let (is_sleep, lock, condvar) = &*worker_waker;
1353 let _locked = lock.lock();
1354 if is_sleep.load(Ordering::Relaxed) {
1355 is_sleep.store(false, Ordering::SeqCst); condvar.notify_one();
1358 }
1359 }
1360
1361 result
1362 }
1363
1364 fn spawn_timing_with_context<F, C>(
1365 &self,
1366 task_id: TaskId,
1367 future: F,
1368 context: C,
1369 time: usize,
1370 ) -> Result<()>
1371 where
1372 F: Future<Output = O> + Send + 'static,
1373 C: Send + 'static,
1374 {
1375 let rt = self.clone();
1376 self.spawn_by_id(task_id, async move {
1377 if let Some(timers) = &(rt.0).2 {
1378 let id = (rt.0).1.get_thread_id() & 0xffffffff;
1380 let (_, timer) = &timers[id];
1381 timer.set_timer(
1382 AsyncTimingTask::WaitRun(Arc::new(AsyncTask::with_context(
1383 rt.alloc::<F::Output>(),
1384 (rt.0).1.clone(),
1385 DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
1386 Some(future.boxed()),
1387 context,
1388 ))),
1389 time,
1390 );
1391
1392 (rt.0).5.fetch_add(1, Ordering::Relaxed);
1393 }
1394
1395 Default::default()
1396 })
1397 }
1398
1399 fn block_on<F>(&self, future: F) -> Result<F::Output>
1400 where
1401 F: Future + Send + 'static,
1402 <F as Future>::Output: Default + Send + 'static,
1403 {
1404 if let Some(local_rt) = local_async_runtime::<F::Output>() {
1406 if local_rt.get_id() == self.get_id() {
1408 return Err(Error::new(
1410 ErrorKind::WouldBlock,
1411 format!("Block on failed, reason: would block"),
1412 ));
1413 }
1414 }
1415
1416 let (sender, receiver) = bounded(1);
1417 if let Err(e) = self.spawn(async move {
1418 let r = future.await;
1420 sender.send(r);
1421
1422 Default::default()
1423 }) {
1424 return Err(Error::new(
1425 ErrorKind::Other,
1426 format!("Block on failed, reason: {:?}", e),
1427 ));
1428 }
1429
1430 match receiver.recv() {
1432 Err(e) => Err(Error::new(
1433 ErrorKind::Other,
1434 format!("Block on failed, reason: {:?}", e),
1435 )),
1436 Ok(result) => Ok(result),
1437 }
1438 }
1439}
1440
1441impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>
1442 MultiTaskRuntime<O, P>
1443{
1444 pub fn idler_len(&self) -> usize {
1446 (self.0).1.idler_len()
1447 }
1448
1449 pub fn worker_len(&self) -> usize {
1451 (self.0).1.worker_len()
1452 }
1453
1454 pub fn buffer_len(&self) -> usize {
1456 (self.0).1.buffer_len()
1457 }
1458
1459 pub fn to_local_runtime(&self) -> LocalAsyncRuntime<O> {
1461 LocalAsyncRuntime {
1462 inner: self.as_raw(),
1463 get_id_func: MultiTaskRuntime::<O, P>::get_id_raw,
1464 spawn_func: MultiTaskRuntime::<O, P>::spawn_raw,
1465 spawn_timing_func: MultiTaskRuntime::<O, P>::spawn_timing_raw,
1466 timeout_func: MultiTaskRuntime::<O, P>::timeout_raw,
1467 }
1468 }
1469
1470 #[inline]
1472 pub(crate) fn as_raw(&self) -> *const () {
1473 Arc::into_raw(self.0.clone()) as *const ()
1474 }
1475
1476 #[inline]
1478 pub(crate) fn from_raw(raw: *const ()) -> Self {
1479 let inner = unsafe {
1480 Arc::from_raw(
1481 raw as *const (
1482 usize,
1483 Arc<P>,
1484 Option<
1485 Vec<(
1486 Sender<(usize, AsyncTimingTask<P, O>)>,
1487 Arc<AsyncTaskTimerByNotCancel<P, O>>,
1488 )>,
1489 >,
1490 AtomicUsize,
1491 Arc<ArrayQueue<Arc<(AtomicBool, Mutex<()>, Condvar)>>>,
1492 AtomicUsize,
1493 AtomicUsize,
1494 ),
1495 )
1496 };
1497 MultiTaskRuntime(inner)
1498 }
1499
1500 pub(crate) fn get_id_raw(raw: *const ()) -> usize {
1502 let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
1503 let id = rt.get_id();
1504 Arc::into_raw(rt.0); id
1506 }
1507
1508 pub(crate) fn spawn_raw<F>(raw: *const (), future: F) -> Result<()>
1510 where
1511 F: Future<Output = O> + Send + 'static,
1512 {
1513 let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
1514 let result = rt.spawn_by_id(rt.alloc::<F::Output>(), future);
1515 Arc::into_raw(rt.0); result
1517 }
1518
1519 pub(crate) fn spawn_timing_raw(
1521 raw: *const (),
1522 future: BoxFuture<'static, O>,
1523 timeout: usize,
1524 ) -> Result<()> {
1525 let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
1526 let result = rt.spawn_timing_by_id(rt.alloc::<O>(), future, timeout);
1527 Arc::into_raw(rt.0); result
1529 }
1530
1531 pub(crate) fn timeout_raw(raw: *const (), timeout: usize) -> BoxFuture<'static, ()> {
1533 let rt = MultiTaskRuntime::<O, P>::from_raw(raw);
1534 let boxed = rt.timeout(timeout);
1535 Arc::into_raw(rt.0); boxed
1537 }
1538}
1539
1540pub struct MultiTaskRuntimeBuilder<
1544 O: Default + 'static = (),
1545 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = StealableTaskPool<O>,
1546> {
1547 pool: P, prefix: String, init: usize, min: usize, max: usize, stack_size: usize, timeout: u64, interval: Option<usize>, marker: PhantomData<O>,
1556}
1557
1558unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Send
1559 for MultiTaskRuntimeBuilder<O, P>
1560{
1561}
1562unsafe impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>> Sync
1563 for MultiTaskRuntimeBuilder<O, P>
1564{
1565}
1566
1567impl<O: Default + 'static> Default for MultiTaskRuntimeBuilder<O> {
1568 fn default() -> Self {
1570 #[cfg(not(target_arch = "wasm32"))]
1571 let core_len = num_cpus::get(); #[cfg(target_arch = "wasm32")]
1573 let core_len = 1; let pool = StealableTaskPool::with(core_len,
1575 65535,
1576 [1, 1],
1577 3000);
1578 MultiTaskRuntimeBuilder::new(pool)
1579 .thread_stack_size(2 * 1024 * 1024)
1580 .set_timer_interval(1)
1581 }
1582}
1583
1584impl<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>
1585 MultiTaskRuntimeBuilder<O, P>
1586{
1587 pub fn new(mut pool: P) -> Self {
1589 #[cfg(not(target_arch = "wasm32"))]
1590 let core_len = num_cpus::get(); #[cfg(target_arch = "wasm32")]
1592 let core_len = 1; MultiTaskRuntimeBuilder {
1595 pool,
1596 prefix: DEFAULT_WORKER_THREAD_PREFIX.to_string(),
1597 init: core_len,
1598 min: core_len,
1599 max: core_len,
1600 stack_size: DEFAULT_THREAD_STACK_SIZE,
1601 timeout: DEFAULT_WORKER_THREAD_SLEEP_TIME,
1602 interval: None,
1603 marker: PhantomData,
1604 }
1605 }
1606
1607 pub fn thread_prefix(mut self, prefix: &str) -> Self {
1609 self.prefix = prefix.to_string();
1610 self
1611 }
1612
1613 pub fn thread_stack_size(mut self, stack_size: usize) -> Self {
1615 self.stack_size = stack_size;
1616 self
1617 }
1618
1619 pub fn init_worker_size(mut self, mut init: usize) -> Self {
1621 if init == 0 {
1622 init = DEFAULT_INIT_WORKER_SIZE;
1624 }
1625
1626 self.init = init;
1627 self
1628 }
1629
1630 pub fn set_worker_limit(mut self, mut min: usize, mut max: usize) -> Self {
1632 if self.init > max {
1633 max = self.init;
1635 }
1636
1637 if min == 0 || min > max {
1638 min = max;
1640 }
1641
1642 self.min = min;
1643 self.max = max;
1644 self
1645 }
1646
1647 pub fn set_timeout(mut self, timeout: u64) -> Self {
1649 self.timeout = timeout;
1650 self
1651 }
1652
1653 pub fn set_timer_interval(mut self, interval: usize) -> Self {
1655 self.interval = Some(interval);
1656 self
1657 }
1658
1659 pub fn build(mut self) -> MultiTaskRuntime<O, P> {
1661 let interval = self.interval;
1663 let mut timers = if let Some(_) = interval {
1664 Some(Vec::with_capacity(self.max))
1665 } else {
1666 None
1667 };
1668 for _ in 0..self.max {
1669 if let Some(vec) = &mut timers {
1671 let timer = AsyncTaskTimerByNotCancel::new();
1672 let producor = timer.producor.clone();
1673 let timer = Arc::new(timer);
1674 vec.push((producor, timer));
1675 };
1676 }
1677
1678 let rt_uid = alloc_rt_uid();
1680 let waits = Arc::new(ArrayQueue::new(self.max));
1681 let mut pool = self.pool;
1682 pool.set_waits(waits.clone()); let pool = Arc::new(pool);
1684 let runtime = MultiTaskRuntime(Arc::new((
1685 rt_uid,
1686 pool,
1687 timers,
1688 AtomicUsize::new(0),
1689 waits,
1690 AtomicUsize::new(0),
1691 AtomicUsize::new(0),
1692 )));
1693
1694 let mut builders = Vec::with_capacity(self.init);
1696 for index in 0..self.init {
1697 let builder = Builder::new()
1698 .name(self.prefix.clone() + "-" + index.to_string().as_str())
1699 .stack_size(self.stack_size);
1700 builders.push(builder);
1701 }
1702
1703 let min = self.min;
1705 for index in 0..builders.len() {
1706 let builder = builders.remove(0);
1707 let runtime = runtime.clone();
1708 let timeout = self.timeout;
1709 let timer = if let Some(timers) = &(runtime.0).2 {
1710 let (_, timer) = &timers[index];
1711 Some(timer.clone())
1712 } else {
1713 None
1714 };
1715
1716 spawn_worker_thread(builder, index, runtime, min, timeout, interval, timer);
1717 }
1718
1719 runtime
1720 }
1721}
1722
1723fn spawn_worker_thread<
1725 O: Default + 'static,
1726 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
1727>(
1728 builder: Builder,
1729 index: usize,
1730 runtime: MultiTaskRuntime<O, P>,
1731 min: usize,
1732 timeout: u64,
1733 interval: Option<usize>,
1734 timer: Option<Arc<AsyncTaskTimerByNotCancel<P, O>>>,
1735) {
1736 if let Some(timer) = timer {
1737 let rt_uid = runtime.get_id();
1739 let _ = builder.spawn(move || {
1740 if let Err(e) = PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| unsafe {
1742 *thread_id.get() = rt_uid << 32 | index & 0xffffffff;
1743 }) {
1744 panic!(
1745 "Multi thread runtime startup failed, thread id: {:?}, reason: {:?}",
1746 index, e
1747 );
1748 }
1749
1750 let runtime_copy = runtime.clone();
1752 match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
1753 let raw = Arc::into_raw(Arc::new(runtime_copy.to_local_runtime()))
1754 as *mut LocalAsyncRuntime<O> as *mut ();
1755 rt.store(raw, Ordering::Relaxed);
1756 }) {
1757 Err(e) => {
1758 panic!("Bind multi runtime to local thread failed, reason: {:?}", e);
1759 }
1760 Ok(_) => (),
1761 }
1762
1763 timer_work_loop(
1765 runtime,
1766 index,
1767 min,
1768 timeout,
1769 interval.unwrap() as u64,
1770 timer,
1771 );
1772 });
1773 } else {
1774 let rt_uid = runtime.get_id();
1776 let _ = builder.spawn(move || {
1777 if let Err(e) = PI_ASYNC_THREAD_LOCAL_ID.try_with(move |thread_id| unsafe {
1779 *thread_id.get() = rt_uid << 32 | index & 0xffffffff;
1780 }) {
1781 panic!(
1782 "Multi thread runtime startup failed, thread id: {:?}, reason: {:?}",
1783 index, e
1784 );
1785 }
1786
1787 let runtime_copy = runtime.clone();
1789 match PI_ASYNC_LOCAL_THREAD_ASYNC_RUNTIME.try_with(move |rt| {
1790 let raw = Arc::into_raw(Arc::new(runtime_copy.to_local_runtime()))
1791 as *mut LocalAsyncRuntime<O> as *mut ();
1792 rt.store(raw, Ordering::Relaxed);
1793 }) {
1794 Err(e) => {
1795 panic!("Bind multi runtime to local thread failed, reason: {:?}", e);
1796 }
1797 Ok(_) => (),
1798 }
1799
1800 work_loop(runtime, index, min, timeout);
1802 });
1803 }
1804}
1805
1806fn timer_work_loop<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(
1808 runtime: MultiTaskRuntime<O, P>,
1809 index: usize,
1810 min: usize,
1811 sleep_timeout: u64,
1812 timer_interval: u64,
1813 timer: Arc<AsyncTaskTimerByNotCancel<P, O>>,
1814) {
1815 let pool = (runtime.0).1.clone();
1817 let worker_waker = pool.clone_thread_waker().unwrap();
1818
1819 let mut sleep_count = 0; let clock = Clock::new();
1821 loop {
1822 let timer_run_millis = clock.recent(); let mut pop_len = 0;
1825 (runtime.0)
1826 .5
1827 .fetch_add(timer.consume(),
1828 Ordering::Relaxed);
1829 loop {
1830 let current_time = timer.is_require_pop();
1831 if let Some(current_time) = current_time {
1832 loop {
1834 let timed_out = timer.pop(current_time);
1835 if let Some(timing_task) = timed_out {
1836 match timing_task {
1837 AsyncTimingTask::Pended(expired) => {
1838 runtime.wakeup::<O>(&expired);
1840 }
1841 AsyncTimingTask::WaitRun(expired) => {
1842 (runtime.0)
1844 .1
1845 .push_priority(DEFAULT_MAX_HIGH_PRIORITY_BOUNDED,
1846 expired);
1847 if let Some(task) = pool.try_pop() {
1848 sleep_count = 0; run_task(&runtime, task);
1850 }
1851 }
1852 }
1853 pop_len += 1;
1854
1855 if let Some(task) = pool.try_pop() {
1856 sleep_count = 0; run_task(&runtime, task);
1859 }
1860 } else {
1861 break;
1863 }
1864 }
1865 } else {
1866 break;
1868 }
1869 }
1870 (runtime.0)
1871 .6
1872 .fetch_add(pop_len,
1873 Ordering::Relaxed);
1874
1875 match pool.try_pop() {
1877 None => {
1878 if runtime.len() > 0 {
1879 continue;
1881 }
1882
1883 {
1885 let (is_sleep, lock, condvar) = &*worker_waker;
1886 let mut locked = lock.lock();
1887
1888 is_sleep.store(true, Ordering::SeqCst);
1890
1891 let diff_time = clock
1893 .recent()
1894 .duration_since(timer_run_millis)
1895 .as_millis() as u64; let real_timeout = if timer.len() == 0 {
1897 sleep_timeout
1899 } else {
1900 if diff_time >= timer_interval {
1902 continue;
1904 } else {
1905 timer_interval - diff_time
1907 }
1908 };
1909
1910 (runtime.0).4.push(worker_waker.clone());
1912
1913 if condvar
1915 .wait_for(&mut locked, Duration::from_millis(real_timeout))
1916 .timed_out()
1917 {
1918 is_sleep.store(false, Ordering::SeqCst);
1920 sleep_count += 1;
1922 }
1923 }
1924 }
1925 Some(task) => {
1926 sleep_count = 0; run_task(&runtime, task);
1929 }
1930 }
1931 }
1932
1933 (runtime.0).1.close_worker();
1935 warn!(
1936 "Worker of runtime closed, runtime: {}, worker: {}, thread: {:?}",
1937 runtime.get_id(),
1938 index,
1939 thread::current()
1940 );
1941}
1942
1943fn work_loop<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(
1945 runtime: MultiTaskRuntime<O, P>,
1946 index: usize,
1947 min: usize,
1948 sleep_timeout: u64,
1949) {
1950 let pool = (runtime.0).1.clone();
1952 let worker_waker = pool.clone_thread_waker().unwrap();
1953
1954 let mut sleep_count = 0; loop {
1956 match pool.try_pop() {
1957 None => {
1958 if runtime.len() > 0 {
1960 continue;
1962 }
1963
1964 {
1965 let (is_sleep, lock, condvar) = &*worker_waker;
1966 let mut locked = lock.lock();
1967
1968 is_sleep.store(true, Ordering::SeqCst);
1970
1971 (runtime.0).4.push(worker_waker.clone());
1973
1974 if condvar
1976 .wait_for(&mut locked, Duration::from_millis(sleep_timeout))
1977 .timed_out()
1978 {
1979 is_sleep.store(false, Ordering::SeqCst);
1981 sleep_count += 1;
1983 }
1984 }
1985 }
1986 Some(task) => {
1987 sleep_count = 0; run_task(&runtime, task);
1990 }
1991 }
1992 }
1993
1994 (runtime.0).1.close_worker();
1996 warn!(
1997 "Worker of runtime closed, runtime: {}, worker: {}, thread: {:?}",
1998 runtime.get_id(),
1999 index,
2000 thread::current()
2001 );
2002}
2003
2004#[inline]
2006fn run_task<O: Default + 'static, P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>>(
2007 runtime: &MultiTaskRuntime<O, P>,
2008 task: Arc<AsyncTask<P, O>>,
2009) {
2010 let waker = waker_ref(&task);
2011 let mut context = Context::from_waker(&*waker);
2012 if let Some(mut future) = task.get_inner() {
2013 if let Poll::Pending = future.as_mut().poll(&mut context) {
2014 task.set_inner(Some(future));
2016 }
2017 } else {
2018 (runtime.0).1.push(task);
2020 }
2021}
2022
2023enum TimerTaskProducor<
2025 O: Default + 'static = (),
2026 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = StealableTaskPool<O>,
2027> {
2028 Local(Arc<AsyncTaskTimerByNotCancel<P, O>>), Foreign(Sender<(usize, AsyncTimingTask<P, O>)>), }