1use std::{
2 cell::UnsafeCell,
3 future::Future,
4 mem::transmute,
5 pin::Pin,
6 sync::{
7 Arc,
8 atomic::{AtomicBool, AtomicUsize, Ordering},
9 },
10 task::*,
11 thread,
12 time::Duration,
13};
14
15use crossfire::{mpmc, mpsc};
16use tokio::time::{sleep, timeout};
17
18use super::*;
19
20#[allow(unused_must_use)]
21pub struct WorkerPoolBounded<
22 M: Send + Sized + Unpin + 'static,
23 W: Worker<M>,
24 S: WorkerPoolImpl<M, W>,
25>(Arc<WorkerPoolBoundedInner<M, W, S>>);
26
27struct WorkerPoolBoundedInner<M, W, S>
28where
29 M: Send + Sized + Unpin + 'static,
30 W: Worker<M>,
31 S: WorkerPoolImpl<M, W>,
32{
33 worker_count: AtomicUsize,
34 sender: UnsafeCell<Option<TxWraper<Option<M>>>>,
35 min_workers: usize,
36 max_workers: usize,
37 worker_timeout: Duration,
38 inner: S,
39 phantom: std::marker::PhantomData<W>, closing: AtomicBool,
41 notify_sender: mpsc::TxFuture<Option<()>, mpsc::SharedFutureBoth>,
42 notify_recv: UnsafeCell<Option<mpsc::RxFuture<Option<()>, mpsc::SharedFutureBoth>>>,
43 auto: bool,
44 channel_size: usize,
45 real_thread: AtomicBool,
46 bind_cpu: AtomicUsize,
47 max_cpu: usize,
48}
49
50unsafe impl<M, W, S> Send for WorkerPoolBoundedInner<M, W, S>
51where
52 M: Send + Sized + Unpin + 'static,
53 W: Worker<M>,
54 S: WorkerPoolImpl<M, W>,
55{
56}
57
58unsafe impl<M, W, S> Sync for WorkerPoolBoundedInner<M, W, S>
59where
60 M: Send + Sized + Unpin + 'static,
61 W: Worker<M>,
62 S: WorkerPoolImpl<M, W>,
63{
64}
65
66impl<M, W, S> Clone for WorkerPoolBounded<M, W, S>
67where
68 M: Send + Sized + Unpin + 'static,
69 W: Worker<M>,
70 S: WorkerPoolImpl<M, W>,
71{
72 #[inline]
73 fn clone(&self) -> Self {
74 Self(self.0.clone())
75 }
76}
77
78impl<M, W, S> WorkerPoolBounded<M, W, S>
79where
80 M: Send + Sized + Unpin + 'static,
81 W: Worker<M>,
82 S: WorkerPoolImpl<M, W>,
83{
84 pub fn new(
85 inner: S,
86 min_workers: usize,
87 max_workers: usize,
88 channel_size: usize,
89 worker_timeout: Duration,
90 ) -> Self {
91 assert!(min_workers > 0);
92 assert!(max_workers >= min_workers);
93
94 let auto: bool = min_workers < max_workers;
95 if auto {
96 assert!(worker_timeout != ZERO_DUARTION);
97 }
98 let (noti_sender, noti_recv) = mpsc::bounded_future_both(1);
102 let pool = Arc::new(WorkerPoolBoundedInner {
103 sender: UnsafeCell::new(None),
104 inner,
105 worker_count: AtomicUsize::new(0),
106 min_workers,
107 max_workers,
108 channel_size,
109 worker_timeout,
110 phantom: Default::default(),
111 closing: AtomicBool::new(false),
112 notify_sender: noti_sender,
113 notify_recv: UnsafeCell::new(Some(noti_recv)),
114 auto,
115 real_thread: AtomicBool::new(false),
116 bind_cpu: AtomicUsize::new(0),
117 max_cpu: num_cpus::get(),
118 });
119 Self(pool)
120 }
121
122 pub fn set_use_thread(&mut self, ok: bool) {
124 self.0.real_thread.store(ok, Ordering::Release);
125 }
126
127 pub fn start(&self) {
128 let _self = self.0.as_ref();
129 let rx;
130 if !_self.auto && _self.real_thread.load(Ordering::Acquire) {
131 let (sender, recv) = mpmc::bounded_tx_future_rx_blocking(_self.channel_size);
132 _self._sender().replace(TxWraper::Blocking(sender));
133 rx = RxWraper::Blocking(recv);
134 } else {
135 let (sender, recv) = mpmc::bounded_future_both(_self.channel_size);
136 _self._sender().replace(TxWraper::Async(sender));
137 rx = RxWraper::Async(recv);
138 }
139
140 for _ in 0.._self.min_workers {
141 self.0.clone().spawn(true, rx.clone());
142 }
143 if _self.auto {
144 let _pool = self.0.clone();
145 let notify_recv: &mut Option<mpsc::RxFuture<Option<()>, mpsc::SharedFutureBoth>> =
146 unsafe { transmute(_self.notify_recv.get()) };
147 let noti_rx = notify_recv.take().unwrap();
148 tokio::spawn(async move {
149 _pool.monitor(noti_rx, rx).await;
150 });
151 }
152 }
153
154 pub async fn close_async(&self) {
155 let _self = self.0.as_ref();
156 if _self.closing.swap(true, Ordering::SeqCst) {
157 return;
158 }
159 if _self.auto {
160 let _ = _self.notify_sender.send(None).await;
161 }
162 let sender = _self._sender().as_ref().unwrap();
163 loop {
164 let cur = self.get_worker_count();
165 if cur == 0 {
166 break;
167 }
168 debug!("worker pool closing: cur workers {}", cur);
169 for _ in 0..cur {
170 let _ = sender.send(None).await;
171 }
172 sleep(Duration::from_secs(1)).await;
173 }
174 }
175
176 pub fn close(&self) {
178 if let Ok(_rt) = tokio::runtime::Handle::try_current() {
179 warn!("close in runtime thread, spawn close thread");
180 let _self = self.clone();
181 std::thread::spawn(move || {
182 let rt = tokio::runtime::Builder::new_current_thread()
183 .enable_all()
184 .build()
185 .expect("runtime");
186 rt.block_on(async move {
187 _self.close_async().await;
188 });
189 });
190 } else {
191 let rt = tokio::runtime::Builder::new_current_thread()
192 .enable_all()
193 .build()
194 .expect("runtime");
195 let _self = self.clone();
196 rt.block_on(async move {
197 _self.close_async().await;
198 });
199 }
200 }
201
202 pub fn get_worker_count(&self) -> usize {
203 self.0.get_worker_count()
204 }
205
206 pub fn get_inner(&self) -> &S {
207 &self.0.inner
208 }
209
210 #[inline]
211 pub fn try_submit(&self, msg: M) -> Option<M> {
212 let _self = self.0.as_ref();
213 if _self.closing.load(Ordering::Acquire) {
214 return Some(msg);
215 }
216 match _self._sender().as_ref().unwrap().try_send(Some(msg)) {
217 Err(mpmc::TrySendError::Disconnected(m)) => {
218 return m;
219 }
220 Err(mpmc::TrySendError::Full(m)) => {
221 return m;
222 }
223 Ok(_) => return None,
224 }
225 }
226
227 #[inline]
229 pub fn submit<'a>(&'a self, mut msg: M) -> SubmitFuture<'a, M> {
230 let _self = self.0.as_ref();
231 if _self.closing.load(Ordering::Acquire) {
232 return SubmitFuture {
233 send_f: None,
234 res: Some(Err(msg)),
235 };
236 }
237 let sender = _self._sender().as_ref().unwrap();
238 if _self.auto {
239 match sender.try_send(Some(msg)) {
240 Err(mpmc::TrySendError::Disconnected(m)) => {
241 return SubmitFuture {
242 send_f: None,
243 res: Some(Err(m.unwrap())),
244 };
245 }
246 Err(mpmc::TrySendError::Full(m)) => {
247 msg = m.unwrap();
248 }
249 Ok(_) => {
250 return SubmitFuture {
251 send_f: None,
252 res: Some(Ok(())),
253 };
254 }
255 }
256 let worker_count = _self.get_worker_count();
257 if worker_count < _self.max_workers {
258 let _ = _self.notify_sender.try_send(Some(()));
259 }
260 }
261 let send_f = sender.send(Some(msg));
262 return SubmitFuture {
263 send_f: Some(send_f),
264 res: None,
265 };
266 }
267}
268
269pub struct SubmitFuture<'a, M: Send + Sized + Unpin + 'static> {
270 send_f: Option<SendFutureWarper<'a, Option<M>>>,
271 res: Option<Result<(), M>>,
272}
273
274impl<'a, M: Send + Sized + Unpin + 'static> Future for SubmitFuture<'a, M> {
275 type Output = Option<M>;
276
277 fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
278 let _self = self.get_mut();
279 if _self.res.is_some() {
280 match _self.res.take().unwrap() {
281 Ok(()) => return Poll::Ready(None),
282 Err(m) => return Poll::Ready(Some(m)),
283 }
284 }
285 let send_f = _self.send_f.as_mut().unwrap();
286 if let Poll::Ready(r) = Pin::new(send_f).poll(ctx) {
287 match r {
288 Ok(()) => return Poll::Ready(None),
289 Err(mpmc::SendError(e)) => {
290 return Poll::Ready(e);
291 }
292 }
293 }
294 Poll::Pending
295 }
296}
297
298#[async_trait]
299impl<M, W, S> WorkerPoolAsyncInf<M> for WorkerPoolBounded<M, W, S>
300where
301 M: Send + Sized + Unpin + 'static,
302 W: Worker<M>,
303 S: WorkerPoolImpl<M, W>,
304{
305 #[inline]
306 async fn submit(&self, msg: M) -> Option<M> {
307 self.submit(msg).await
308 }
309
310 #[inline]
311 fn try_submit(&self, msg: M) -> Option<M> {
312 self.try_submit(msg)
313 }
314}
315
316impl<M, W, S> WorkerPoolBoundedInner<M, W, S>
317where
318 M: Send + Sized + Unpin + 'static,
319 W: Worker<M>,
320 S: WorkerPoolImpl<M, W>,
321{
322 #[inline(always)]
323 fn _sender(&self) -> &mut Option<TxWraper<Option<M>>> {
324 unsafe { transmute(self.sender.get()) }
325 }
326
327 async fn run_worker_simple(&self, mut worker: W, rx: RxWraper<Option<M>>) {
328 if let Err(_) = worker.init().await {
329 let _ = self.try_exit();
330 worker.on_exit();
331 return;
332 }
333 loop {
334 match rx.recv().await {
335 Ok(item) => {
336 if item.is_none() {
337 let _ = self.try_exit();
338 break;
339 }
340 worker.run(item.unwrap()).await;
341 }
342 Err(_) => {
343 let _ = self.try_exit();
345 break;
346 }
347 }
348 }
349 worker.on_exit();
350 }
351
352 async fn run_worker_adjust(&self, mut worker: W, rx: RxWraper<Option<M>>) {
353 if let Err(_) = worker.init().await {
354 let _ = self.try_exit();
355 worker.on_exit();
356 return;
357 }
358
359 let worker_timeout = self.worker_timeout;
360 let mut is_idle = false;
361 'WORKER_LOOP: loop {
362 if is_idle {
363 match timeout(worker_timeout, rx.recv()).await {
364 Ok(res) => {
365 match res {
366 Ok(item) => {
367 if item.is_none() {
368 let _ = self.try_exit();
369 break 'WORKER_LOOP;
370 }
371 worker.run(item.unwrap()).await;
372 is_idle = false;
373 }
374 Err(_) => {
375 let _ = self.try_exit();
377 worker.on_exit();
378 }
379 }
380 }
381 Err(_) => {
382 if self.try_exit() {
384 break 'WORKER_LOOP;
385 }
386 }
387 }
388 } else {
389 match rx.try_recv() {
390 Err(e) => {
391 if e.is_empty() {
392 is_idle = true;
393 } else {
394 let _ = self.try_exit();
395 break 'WORKER_LOOP;
396 }
397 }
398 Ok(Some(item)) => {
399 worker.run(item).await;
400 is_idle = false;
401 }
402 Ok(None) => {
403 let _ = self.try_exit();
404 break 'WORKER_LOOP;
405 }
406 }
407 }
408 }
409 worker.on_exit();
410 }
411
412 #[inline(always)]
413 pub fn get_worker_count(&self) -> usize {
414 self.worker_count.load(Ordering::Acquire)
415 }
416
417 #[inline(always)]
418 fn spawn(self: Arc<Self>, initial: bool, rx: RxWraper<Option<M>>) {
419 self.worker_count.fetch_add(1, Ordering::SeqCst);
420 let worker = self.inner.spawn();
421 let _self = self.clone();
422 if self.real_thread.load(Ordering::Acquire) {
423 let mut bind_cpu: Option<usize> = None;
424 if _self.bind_cpu.load(Ordering::Acquire) <= _self.max_cpu {
425 let cpu = _self.bind_cpu.fetch_add(1, Ordering::SeqCst);
426 if cpu < _self.max_cpu {
427 bind_cpu = Some(cpu as usize);
428 }
429 }
430 thread::spawn(move || {
431 if let Some(cpu) = bind_cpu {
432 core_affinity::set_for_current(core_affinity::CoreId { id: cpu });
433 }
434 let rt = tokio::runtime::Builder::new_current_thread()
435 .enable_all()
436 .build()
437 .expect("runtime");
438 rt.block_on(async move {
439 if initial || !_self.auto {
440 _self.run_worker_simple(worker, rx).await
441 } else {
442 _self.run_worker_adjust(worker, rx).await
443 }
444 });
445 });
446 } else {
447 tokio::spawn(async move {
448 if initial || !_self.auto {
449 _self.run_worker_simple(worker, rx).await
450 } else {
451 _self.run_worker_adjust(worker, rx).await
452 }
453 });
454 }
455 }
456
457 #[inline(always)]
459 fn try_exit(&self) -> bool {
460 if self.closing.load(Ordering::Acquire) {
461 self.worker_count.fetch_sub(1, Ordering::SeqCst);
462 return true;
463 }
464 if self.get_worker_count() > self.min_workers {
465 if self.worker_count.fetch_sub(1, Ordering::SeqCst) <= self.min_workers {
466 self.worker_count.fetch_add(1, Ordering::SeqCst); } else {
468 return true; }
470 }
471 return false;
472 }
473
474 async fn monitor(
475 self: Arc<Self>,
476 noti: mpsc::RxFuture<Option<()>, mpsc::SharedFutureBoth>,
477 rx: RxWraper<Option<M>>,
478 ) {
479 let _self = self.as_ref();
480 loop {
481 match timeout(Duration::from_secs(1), noti.recv()).await {
482 Err(_) => {
483 if _self.closing.load(Ordering::Acquire) {
484 return;
485 }
486 continue;
487 }
488 Ok(Ok(Some(_))) => {
489 if _self.closing.load(Ordering::Acquire) {
490 return;
491 }
492 let worker_count = _self.get_worker_count();
493 if worker_count > _self.max_workers {
494 continue;
495 }
496 self.clone().spawn(false, rx.clone());
497 }
498 _ => {
499 println!("monitor exit");
500 return;
501 }
502 }
503 }
504 }
505}
506
507enum TxWraper<M: Send + Sized + Unpin + 'static> {
508 Blocking(mpmc::TxFuture<M, mpmc::SharedSenderFRecvB>),
509 Async(mpmc::TxFuture<M, mpmc::SharedFutureBoth>),
510}
511
512enum RxWraper<M: Send + Sized + Unpin + 'static> {
513 Blocking(mpmc::RxBlocking<M, mpmc::SharedSenderFRecvB>),
514 Async(mpmc::RxFuture<M, mpmc::SharedFutureBoth>),
515}
516
517pub enum SendFutureWarper<'a, M: Send + Sized + Unpin + 'static> {
518 Blocking(mpmc::SendFuture<'a, M, mpmc::SharedSenderFRecvB>),
519 Async(mpmc::SendFuture<'a, M, mpmc::SharedFutureBoth>),
520}
521
522impl<'a, M: Send + Sized + Unpin + 'static> Future for SendFutureWarper<'a, M> {
523 type Output = Result<(), mpmc::SendError<M>>;
524
525 fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
526 let _self = self.get_mut();
527 match _self {
528 Self::Blocking(s) => Pin::new(s).poll(ctx),
529 Self::Async(s) => Pin::new(s).poll(ctx),
530 }
531 }
532}
533
534impl<M: Send + Sized + Unpin + 'static> TxWraper<M> {
535 #[inline(always)]
536 fn try_send(&self, m: M) -> Result<(), mpmc::TrySendError<M>> {
537 match self {
538 Self::Blocking(s) => s.try_send(m),
539 Self::Async(s) => s.try_send(m),
540 }
541 }
542
543 #[inline(always)]
544 fn send<'a>(&'a self, m: M) -> SendFutureWarper<'a, M> {
545 match self {
546 Self::Blocking(s) => SendFutureWarper::Blocking(s.make_send_future(m)),
547 Self::Async(s) => SendFutureWarper::Async(s.make_send_future(m)),
548 }
549 }
550}
551
552impl<M: Send + Sized + Unpin + 'static> RxWraper<M> {
553 #[inline(always)]
554 async fn recv(&self) -> Result<M, mpmc::RecvError> {
555 match self {
556 Self::Blocking(s) => s.recv(),
557 Self::Async(s) => s.recv().await,
558 }
559 }
560
561 #[inline(always)]
562 fn try_recv(&self) -> Result<M, mpmc::TryRecvError> {
563 match self {
564 Self::Blocking(s) => s.try_recv(),
565 Self::Async(s) => s.try_recv(),
566 }
567 }
568}
569
570impl<M: Send + Sized + Unpin + 'static> Clone for RxWraper<M> {
571 fn clone(&self) -> Self {
572 match self {
573 Self::Blocking(s) => Self::Blocking(s.clone()),
574 Self::Async(s) => Self::Async(s.clone()),
575 }
576 }
577}
578
579#[cfg(test)]
580mod tests {
581
582 use crossfire::mpsc;
583 use tokio::time::{Duration, sleep};
584
585 use super::*;
586
587 #[allow(dead_code)]
588 struct MyWorkerPoolImpl();
589
590 struct MyWorker();
591
592 struct MyMsg(i64, mpsc::TxFuture<(), mpsc::SharedFutureBoth>);
593
594 impl WorkerPoolImpl<MyMsg, MyWorker> for MyWorkerPoolImpl {
595 fn spawn(&self) -> MyWorker {
596 MyWorker()
597 }
598 }
599
600 #[async_trait]
601 impl Worker<MyMsg> for MyWorker {
602 async fn run(&mut self, msg: MyMsg) {
603 sleep(Duration::from_millis(1)).await;
604 println!("done {}", msg.0);
605 let _ = msg.1.send(()).await;
606 }
607 }
608
609 type MyWorkerPool = WorkerPoolBounded<MyMsg, MyWorker, MyWorkerPoolImpl>;
610
611 #[test]
612 fn bounded_workerpool_adjust_close_async() {
613 let min_workers = 1;
614 let max_workers = 4;
615 let worker_timeout = Duration::from_secs(1);
616 let rt = tokio::runtime::Builder::new_multi_thread()
617 .enable_all()
618 .worker_threads(2)
619 .build()
620 .unwrap();
621 let worker_pool = MyWorkerPool::new(
622 MyWorkerPoolImpl(),
623 min_workers,
624 max_workers,
625 1,
626 worker_timeout,
627 );
628 let _worker_pool = worker_pool.clone();
629 rt.block_on(async move {
630 worker_pool.start();
631 let mut ths = Vec::new();
632 for i in 0..5 {
633 let _pool = worker_pool.clone();
634 ths.push(tokio::task::spawn(async move {
635 let (done_tx, done_rx) = mpsc::bounded_future_both(10);
636 for j in 0..2 {
637 let m = i * 10 + j;
638 println!("submit {} in {}/{}", m, j, i);
639 _pool.submit(MyMsg(m, done_tx.clone())).await;
640 }
641 for _j in 0..2 {
642 let _ = done_rx.recv().await;
644 }
645 }));
646 }
647 for th in ths {
648 let _ = th.await;
649 }
650 let workers = worker_pool.get_worker_count();
651 println!("cur workers {} might reach max {}", workers, max_workers);
652 sleep(worker_timeout * 2).await;
655 let workers = worker_pool.get_worker_count();
656 println!("cur workers: {}, extra should exit due to timeout", workers);
657 assert_eq!(workers, min_workers);
658
659 let (done_tx, done_rx) = mpsc::bounded_future_both(1);
660 for j in 0..10 {
661 worker_pool.submit(MyMsg(80 + j, done_tx.clone())).await;
662 let _ = done_rx.recv().await;
663 }
664 println!("closing");
665 _worker_pool.close();
666 sleep(Duration::from_secs(2)).await;
667 assert_eq!(_worker_pool.get_worker_count(), 0);
668 });
669 }
670
671 #[test]
672 fn bounded_workerpool_adjust_close() {
673 let min_workers = 1;
674 let max_workers = 4;
675 let worker_timeout = Duration::from_secs(1);
676 let rt = tokio::runtime::Builder::new_multi_thread()
677 .enable_all()
678 .worker_threads(2)
679 .build()
680 .unwrap();
681 let worker_pool = MyWorkerPool::new(
682 MyWorkerPoolImpl(),
683 min_workers,
684 max_workers,
685 1,
686 worker_timeout,
687 );
688 let _worker_pool = worker_pool.clone();
689 rt.block_on(async move {
690 worker_pool.start();
691 let mut ths = Vec::new();
692 for i in 0..5 {
693 let _pool = worker_pool.clone();
694 ths.push(tokio::task::spawn(async move {
695 let (done_tx, done_rx) = mpsc::bounded_future_both(10);
696 for j in 0..2 {
697 let m = i * 10 + j;
698 println!("submit {} in {}/{}", m, j, i);
699 _pool.submit(MyMsg(m, done_tx.clone())).await;
700 }
701 for _j in 0..2 {
702 let _ = done_rx.recv().await;
704 }
705 }));
706 }
707 for th in ths {
708 let _ = th.await;
709 }
710 let workers = worker_pool.get_worker_count();
711 println!("cur workers {} might reach max {}", workers, max_workers);
712 sleep(worker_timeout * 2).await;
715 let workers = worker_pool.get_worker_count();
716 println!("cur workers: {}, extra should exit due to timeout", workers);
717 assert_eq!(workers, min_workers);
718
719 let (done_tx, done_rx) = mpsc::bounded_future_both(1);
720 for j in 0..10 {
721 worker_pool.submit(MyMsg(80 + j, done_tx.clone())).await;
722 let _ = done_rx.recv().await;
723 }
724 });
725 println!("closing");
726 _worker_pool.close();
727 assert_eq!(_worker_pool.get_worker_count(), 0);
728 }
729
730 #[allow(dead_code)]
731 struct MyBlockingWorkerPoolImpl();
732
733 struct MyBlockingWorker();
734
735 impl WorkerPoolImpl<MyMsg, MyBlockingWorker> for MyBlockingWorkerPoolImpl {
736 fn spawn(&self) -> MyBlockingWorker {
737 MyBlockingWorker()
738 }
739 }
740
741 #[async_trait]
742 impl Worker<MyMsg> for MyBlockingWorker {
743 async fn run(&mut self, msg: MyMsg) {
744 std::thread::sleep(Duration::from_millis(1));
745 println!("done {}", msg.0);
746 let _ = msg.1.send(()).await;
747 }
748 }
749
750 type MyBlockingWorkerPool =
751 WorkerPoolBounded<MyMsg, MyBlockingWorker, MyBlockingWorkerPoolImpl>;
752
753 #[test]
754 fn bounded_workerpool_adjust_real_thread() {
755 let min_workers = 1;
756 let max_workers = 4;
757 let worker_timeout = Duration::from_secs(1);
758 let rt = tokio::runtime::Builder::new_multi_thread()
759 .enable_all()
760 .worker_threads(2)
761 .build()
762 .unwrap();
763 let mut worker_pool = MyBlockingWorkerPool::new(
764 MyBlockingWorkerPoolImpl(),
765 min_workers,
766 max_workers,
767 1,
768 worker_timeout,
769 );
770 worker_pool.set_use_thread(true);
771 let _worker_pool = worker_pool.clone();
772 rt.block_on(async move {
773 worker_pool.start();
774 let mut ths = Vec::new();
775 for i in 0..5 {
776 let _pool = worker_pool.clone();
777 ths.push(tokio::task::spawn(async move {
778 let (done_tx, done_rx) = mpsc::bounded_future_both(10);
779 for j in 0..2 {
780 let m = i * 10 + j;
781 println!("submit {} in {}/{}", m, j, i);
782 _pool.submit(MyMsg(m, done_tx.clone())).await;
783 }
784 for _j in 0..2 {
785 let _ = done_rx.recv().await;
787 }
788 }));
789 }
790 for th in ths {
791 let _ = th.await;
792 }
793 let workers = worker_pool.get_worker_count();
794 println!("cur workers {} might reach max {}", workers, max_workers);
795 sleep(worker_timeout * 2).await;
798 let workers = worker_pool.get_worker_count();
799 println!("cur workers: {}, extra should exit due to timeout", workers);
800 assert_eq!(workers, min_workers);
801
802 let (done_tx, done_rx) = mpsc::bounded_future_both(1);
803 for j in 0..10 {
804 worker_pool.submit(MyMsg(80 + j, done_tx.clone())).await;
805 let _ = done_rx.recv().await;
806 }
807 });
808 println!("closing");
809 _worker_pool.close();
810 assert_eq!(_worker_pool.get_worker_count(), 0);
811 }
812}