1use std::{
2 cell::UnsafeCell,
3 future::Future,
4 mem::transmute,
5 pin::Pin,
6 sync::{
7 Arc,
8 atomic::{AtomicBool, AtomicUsize, Ordering},
9 },
10 task::*,
11 thread,
12 time::Duration,
13};
14
15use crossfire::{mpmc, mpsc};
16use tokio::time::{sleep, timeout};
17
18use super::*;
19
20#[allow(unused_must_use)]
21pub struct WorkerPoolBounded<
22 M: Send + Sized + Unpin + 'static,
23 W: Worker<M>,
24 S: WorkerPoolImpl<M, W>,
25>(Arc<WorkerPoolBoundedInner<M, W, S>>);
26
27struct WorkerPoolBoundedInner<M, W, S>
28where
29 M: Send + Sized + Unpin + 'static,
30 W: Worker<M>,
31 S: WorkerPoolImpl<M, W>,
32{
33 worker_count: AtomicUsize,
34 sender: UnsafeCell<Option<TxWraper<Option<M>>>>,
35 min_workers: usize,
36 max_workers: usize,
37 worker_timeout: Duration,
38 inner: S,
39 phantom: std::marker::PhantomData<W>, closing: AtomicBool,
41 notify_sender: mpsc::TxFuture<Option<()>, mpsc::SharedFutureBoth>,
42 notify_recv: UnsafeCell<Option<mpsc::RxFuture<Option<()>, mpsc::SharedFutureBoth>>>,
43 auto: bool,
44 channel_size: usize,
45 real_thread: AtomicBool,
46 bind_cpu: AtomicUsize,
47 max_cpu: usize,
48}
49
50unsafe impl<M, W, S> Send for WorkerPoolBoundedInner<M, W, S>
51where
52 M: Send + Sized + Unpin + 'static,
53 W: Worker<M>,
54 S: WorkerPoolImpl<M, W>,
55{
56}
57
58unsafe impl<M, W, S> Sync for WorkerPoolBoundedInner<M, W, S>
59where
60 M: Send + Sized + Unpin + 'static,
61 W: Worker<M>,
62 S: WorkerPoolImpl<M, W>,
63{
64}
65
66impl<M, W, S> Clone for WorkerPoolBounded<M, W, S>
67where
68 M: Send + Sized + Unpin + 'static,
69 W: Worker<M>,
70 S: WorkerPoolImpl<M, W>,
71{
72 #[inline]
73 fn clone(&self) -> Self {
74 Self(self.0.clone())
75 }
76}
77
78impl<M, W, S> WorkerPoolBounded<M, W, S>
79where
80 M: Send + Sized + Unpin + 'static,
81 W: Worker<M>,
82 S: WorkerPoolImpl<M, W>,
83{
84 pub fn new(
85 inner: S, min_workers: usize, max_workers: usize, channel_size: usize,
86 worker_timeout: Duration,
87 ) -> Self {
88 assert!(min_workers > 0);
89 assert!(max_workers >= min_workers);
90
91 let auto: bool = min_workers < max_workers;
92 if auto {
93 assert!(worker_timeout != ZERO_DUARTION);
94 }
95 let (noti_sender, noti_recv) = mpsc::bounded_future_both(1);
99 let pool = Arc::new(WorkerPoolBoundedInner {
100 sender: UnsafeCell::new(None),
101 inner,
102 worker_count: AtomicUsize::new(0),
103 min_workers,
104 max_workers,
105 channel_size,
106 worker_timeout,
107 phantom: Default::default(),
108 closing: AtomicBool::new(false),
109 notify_sender: noti_sender,
110 notify_recv: UnsafeCell::new(Some(noti_recv)),
111 auto,
112 real_thread: AtomicBool::new(false),
113 bind_cpu: AtomicUsize::new(0),
114 max_cpu: num_cpus::get(),
115 });
116 Self(pool)
117 }
118
119 pub fn set_use_thread(&mut self, ok: bool) {
121 self.0.real_thread.store(ok, Ordering::Release);
122 }
123
124 pub fn start(&self) {
125 let _self = self.0.as_ref();
126 let rx;
127 if !_self.auto && _self.real_thread.load(Ordering::Acquire) {
128 let (sender, recv) = mpmc::bounded_tx_future_rx_blocking(_self.channel_size);
129 _self._sender().replace(TxWraper::Blocking(sender));
130 rx = RxWraper::Blocking(recv);
131 } else {
132 let (sender, recv) = mpmc::bounded_future_both(_self.channel_size);
133 _self._sender().replace(TxWraper::Async(sender));
134 rx = RxWraper::Async(recv);
135 }
136
137 for _ in 0.._self.min_workers {
138 self.0.clone().spawn(true, rx.clone());
139 }
140 if _self.auto {
141 let _pool = self.0.clone();
142 let notify_recv: &mut Option<mpsc::RxFuture<Option<()>, mpsc::SharedFutureBoth>> =
143 unsafe { transmute(_self.notify_recv.get()) };
144 let noti_rx = notify_recv.take().unwrap();
145 tokio::spawn(async move {
146 _pool.monitor(noti_rx, rx).await;
147 });
148 }
149 }
150
151 pub async fn close_async(&self) {
152 let _self = self.0.as_ref();
153 if _self.closing.swap(true, Ordering::SeqCst) {
154 return;
155 }
156 if _self.auto {
157 let _ = _self.notify_sender.send(None).await;
158 }
159 let sender = _self._sender().as_ref().unwrap();
160 loop {
161 let cur = self.get_worker_count();
162 if cur == 0 {
163 break;
164 }
165 debug!("worker pool closing: cur workers {}", cur);
166 for _ in 0..cur {
167 let _ = sender.send(None).await;
168 }
169 sleep(Duration::from_secs(1)).await;
170 }
171 }
172
173 pub fn close(&self) {
175 if let Ok(_rt) = tokio::runtime::Handle::try_current() {
176 warn!("close in runtime thread, spawn close thread");
177 let _self = self.clone();
178 std::thread::spawn(move || {
179 let rt = tokio::runtime::Builder::new_current_thread()
180 .enable_all()
181 .build()
182 .expect("runtime");
183 rt.block_on(async move {
184 _self.close_async().await;
185 });
186 });
187 } else {
188 let rt = tokio::runtime::Builder::new_current_thread()
189 .enable_all()
190 .build()
191 .expect("runtime");
192 let _self = self.clone();
193 rt.block_on(async move {
194 _self.close_async().await;
195 });
196 }
197 }
198
199 pub fn get_worker_count(&self) -> usize {
200 self.0.get_worker_count()
201 }
202
203 pub fn get_inner(&self) -> &S {
204 &self.0.inner
205 }
206
207 #[inline]
208 pub fn try_submit(&self, msg: M) -> Option<M> {
209 let _self = self.0.as_ref();
210 if _self.closing.load(Ordering::Acquire) {
211 return Some(msg);
212 }
213 match _self._sender().as_ref().unwrap().try_send(Some(msg)) {
214 Err(mpmc::TrySendError::Disconnected(m)) => {
215 return m;
216 }
217 Err(mpmc::TrySendError::Full(m)) => {
218 return m;
219 }
220 Ok(_) => return None,
221 }
222 }
223
224 #[inline]
226 pub fn submit<'a>(&'a self, mut msg: M) -> SubmitFuture<'a, M> {
227 let _self = self.0.as_ref();
228 if _self.closing.load(Ordering::Acquire) {
229 return SubmitFuture { send_f: None, res: Some(Err(msg)) };
230 }
231 let sender = _self._sender().as_ref().unwrap();
232 if _self.auto {
233 match sender.try_send(Some(msg)) {
234 Err(mpmc::TrySendError::Disconnected(m)) => {
235 return SubmitFuture { send_f: None, res: Some(Err(m.unwrap())) };
236 }
237 Err(mpmc::TrySendError::Full(m)) => {
238 msg = m.unwrap();
239 }
240 Ok(_) => {
241 return SubmitFuture { send_f: None, res: Some(Ok(())) };
242 }
243 }
244 let worker_count = _self.get_worker_count();
245 if worker_count < _self.max_workers {
246 let _ = _self.notify_sender.try_send(Some(()));
247 }
248 }
249 let send_f = sender.send(Some(msg));
250 return SubmitFuture { send_f: Some(send_f), res: None };
251 }
252}
253
254pub struct SubmitFuture<'a, M: Send + Sized + Unpin + 'static> {
255 send_f: Option<SendFutureWarper<'a, Option<M>>>,
256 res: Option<Result<(), M>>,
257}
258
259impl<'a, M: Send + Sized + Unpin + 'static> Future for SubmitFuture<'a, M> {
260 type Output = Option<M>;
261
262 fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
263 let _self = self.get_mut();
264 if _self.res.is_some() {
265 match _self.res.take().unwrap() {
266 Ok(()) => return Poll::Ready(None),
267 Err(m) => return Poll::Ready(Some(m)),
268 }
269 }
270 let send_f = _self.send_f.as_mut().unwrap();
271 if let Poll::Ready(r) = Pin::new(send_f).poll(ctx) {
272 match r {
273 Ok(()) => return Poll::Ready(None),
274 Err(mpmc::SendError(e)) => {
275 return Poll::Ready(e);
276 }
277 }
278 }
279 Poll::Pending
280 }
281}
282
283#[async_trait]
284impl<M, W, S> WorkerPoolAsyncInf<M> for WorkerPoolBounded<M, W, S>
285where
286 M: Send + Sized + Unpin + 'static,
287 W: Worker<M>,
288 S: WorkerPoolImpl<M, W>,
289{
290 #[inline]
291 async fn submit(&self, msg: M) -> Option<M> {
292 self.submit(msg).await
293 }
294
295 #[inline]
296 fn try_submit(&self, msg: M) -> Option<M> {
297 self.try_submit(msg)
298 }
299}
300
301impl<M, W, S> WorkerPoolBoundedInner<M, W, S>
302where
303 M: Send + Sized + Unpin + 'static,
304 W: Worker<M>,
305 S: WorkerPoolImpl<M, W>,
306{
307 #[inline(always)]
308 fn _sender(&self) -> &mut Option<TxWraper<Option<M>>> {
309 unsafe { transmute(self.sender.get()) }
310 }
311
312 async fn run_worker_simple(&self, mut worker: W, rx: RxWraper<Option<M>>) {
313 if let Err(_) = worker.init().await {
314 let _ = self.try_exit();
315 worker.on_exit();
316 return;
317 }
318 loop {
319 match rx.recv().await {
320 Ok(item) => {
321 if item.is_none() {
322 let _ = self.try_exit();
323 break;
324 }
325 worker.run(item.unwrap()).await;
326 }
327 Err(_) => {
328 let _ = self.try_exit();
330 break;
331 }
332 }
333 }
334 worker.on_exit();
335 }
336
337 async fn run_worker_adjust(&self, mut worker: W, rx: RxWraper<Option<M>>) {
338 if let Err(_) = worker.init().await {
339 let _ = self.try_exit();
340 worker.on_exit();
341 return;
342 }
343
344 let worker_timeout = self.worker_timeout;
345 let mut is_idle = false;
346 'WORKER_LOOP: loop {
347 if is_idle {
348 match timeout(worker_timeout, rx.recv()).await {
349 Ok(res) => {
350 match res {
351 Ok(item) => {
352 if item.is_none() {
353 let _ = self.try_exit();
354 break 'WORKER_LOOP;
355 }
356 worker.run(item.unwrap()).await;
357 is_idle = false;
358 }
359 Err(_) => {
360 let _ = self.try_exit();
362 worker.on_exit();
363 }
364 }
365 }
366 Err(_) => {
367 if self.try_exit() {
369 break 'WORKER_LOOP;
370 }
371 }
372 }
373 } else {
374 match rx.try_recv() {
375 Err(e) => {
376 if e.is_empty() {
377 is_idle = true;
378 } else {
379 let _ = self.try_exit();
380 break 'WORKER_LOOP;
381 }
382 }
383 Ok(Some(item)) => {
384 worker.run(item).await;
385 is_idle = false;
386 }
387 Ok(None) => {
388 let _ = self.try_exit();
389 break 'WORKER_LOOP;
390 }
391 }
392 }
393 }
394 worker.on_exit();
395 }
396
397 #[inline(always)]
398 pub fn get_worker_count(&self) -> usize {
399 self.worker_count.load(Ordering::Acquire)
400 }
401
402 #[inline(always)]
403 fn spawn(self: Arc<Self>, initial: bool, rx: RxWraper<Option<M>>) {
404 self.worker_count.fetch_add(1, Ordering::SeqCst);
405 let worker = self.inner.spawn();
406 let _self = self.clone();
407 if self.real_thread.load(Ordering::Acquire) {
408 let mut bind_cpu: Option<usize> = None;
409 if _self.bind_cpu.load(Ordering::Acquire) <= _self.max_cpu {
410 let cpu = _self.bind_cpu.fetch_add(1, Ordering::SeqCst);
411 if cpu < _self.max_cpu {
412 bind_cpu = Some(cpu as usize);
413 }
414 }
415 thread::spawn(move || {
416 if let Some(cpu) = bind_cpu {
417 core_affinity::set_for_current(core_affinity::CoreId { id: cpu });
418 }
419 let rt = tokio::runtime::Builder::new_current_thread()
420 .enable_all()
421 .build()
422 .expect("runtime");
423 rt.block_on(async move {
424 if initial || !_self.auto {
425 _self.run_worker_simple(worker, rx).await
426 } else {
427 _self.run_worker_adjust(worker, rx).await
428 }
429 });
430 });
431 } else {
432 tokio::spawn(async move {
433 if initial || !_self.auto {
434 _self.run_worker_simple(worker, rx).await
435 } else {
436 _self.run_worker_adjust(worker, rx).await
437 }
438 });
439 }
440 }
441
442 #[inline(always)]
444 fn try_exit(&self) -> bool {
445 if self.closing.load(Ordering::Acquire) {
446 self.worker_count.fetch_sub(1, Ordering::SeqCst);
447 return true;
448 }
449 if self.get_worker_count() > self.min_workers {
450 if self.worker_count.fetch_sub(1, Ordering::SeqCst) <= self.min_workers {
451 self.worker_count.fetch_add(1, Ordering::SeqCst); } else {
453 return true; }
455 }
456 return false;
457 }
458
459 async fn monitor(
460 self: Arc<Self>, noti: mpsc::RxFuture<Option<()>, mpsc::SharedFutureBoth>,
461 rx: RxWraper<Option<M>>,
462 ) {
463 let _self = self.as_ref();
464 loop {
465 match timeout(Duration::from_secs(1), noti.recv()).await {
466 Err(_) => {
467 if _self.closing.load(Ordering::Acquire) {
468 return;
469 }
470 continue;
471 }
472 Ok(Ok(Some(_))) => {
473 if _self.closing.load(Ordering::Acquire) {
474 return;
475 }
476 let worker_count = _self.get_worker_count();
477 if worker_count > _self.max_workers {
478 continue;
479 }
480 self.clone().spawn(false, rx.clone());
481 }
482 _ => {
483 println!("monitor exit");
484 return;
485 }
486 }
487 }
488 }
489}
490
491enum TxWraper<M: Send + Sized + Unpin + 'static> {
492 Blocking(mpmc::TxFuture<M, mpmc::SharedSenderFRecvB>),
493 Async(mpmc::TxFuture<M, mpmc::SharedFutureBoth>),
494}
495
496enum RxWraper<M: Send + Sized + Unpin + 'static> {
497 Blocking(mpmc::RxBlocking<M, mpmc::SharedSenderFRecvB>),
498 Async(mpmc::RxFuture<M, mpmc::SharedFutureBoth>),
499}
500
501pub enum SendFutureWarper<'a, M: Send + Sized + Unpin + 'static> {
502 Blocking(mpmc::SendFuture<'a, M, mpmc::SharedSenderFRecvB>),
503 Async(mpmc::SendFuture<'a, M, mpmc::SharedFutureBoth>),
504}
505
506impl<'a, M: Send + Sized + Unpin + 'static> Future for SendFutureWarper<'a, M> {
507 type Output = Result<(), mpmc::SendError<M>>;
508
509 fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
510 let _self = self.get_mut();
511 match _self {
512 Self::Blocking(s) => Pin::new(s).poll(ctx),
513 Self::Async(s) => Pin::new(s).poll(ctx),
514 }
515 }
516}
517
518impl<M: Send + Sized + Unpin + 'static> TxWraper<M> {
519 #[inline(always)]
520 fn try_send(&self, m: M) -> Result<(), mpmc::TrySendError<M>> {
521 match self {
522 Self::Blocking(s) => s.try_send(m),
523 Self::Async(s) => s.try_send(m),
524 }
525 }
526
527 #[inline(always)]
528 fn send<'a>(&'a self, m: M) -> SendFutureWarper<'a, M> {
529 match self {
530 Self::Blocking(s) => SendFutureWarper::Blocking(s.make_send_future(m)),
531 Self::Async(s) => SendFutureWarper::Async(s.make_send_future(m)),
532 }
533 }
534}
535
536impl<M: Send + Sized + Unpin + 'static> RxWraper<M> {
537 #[inline(always)]
538 async fn recv(&self) -> Result<M, mpmc::RecvError> {
539 match self {
540 Self::Blocking(s) => s.recv(),
541 Self::Async(s) => s.recv().await,
542 }
543 }
544
545 #[inline(always)]
546 fn try_recv(&self) -> Result<M, mpmc::TryRecvError> {
547 match self {
548 Self::Blocking(s) => s.try_recv(),
549 Self::Async(s) => s.try_recv(),
550 }
551 }
552}
553
554impl<M: Send + Sized + Unpin + 'static> Clone for RxWraper<M> {
555 fn clone(&self) -> Self {
556 match self {
557 Self::Blocking(s) => Self::Blocking(s.clone()),
558 Self::Async(s) => Self::Async(s.clone()),
559 }
560 }
561}
562
563#[cfg(test)]
564mod tests {
565
566 use crossfire::mpsc;
567 use tokio::time::{Duration, sleep};
568
569 use super::*;
570
571 #[allow(dead_code)]
572 struct MyWorkerPoolImpl();
573
574 struct MyWorker();
575
576 struct MyMsg(i64, mpsc::TxFuture<(), mpsc::SharedFutureBoth>);
577
578 impl WorkerPoolImpl<MyMsg, MyWorker> for MyWorkerPoolImpl {
579 fn spawn(&self) -> MyWorker {
580 MyWorker()
581 }
582 }
583
584 #[async_trait]
585 impl Worker<MyMsg> for MyWorker {
586 async fn run(&mut self, msg: MyMsg) {
587 sleep(Duration::from_millis(1)).await;
588 println!("done {}", msg.0);
589 let _ = msg.1.send(()).await;
590 }
591 }
592
593 type MyWorkerPool = WorkerPoolBounded<MyMsg, MyWorker, MyWorkerPoolImpl>;
594
595 #[test]
596 fn bounded_workerpool_adjust_close_async() {
597 let min_workers = 1;
598 let max_workers = 4;
599 let worker_timeout = Duration::from_secs(1);
600 let rt = tokio::runtime::Builder::new_multi_thread()
601 .enable_all()
602 .worker_threads(2)
603 .build()
604 .unwrap();
605 let worker_pool =
606 MyWorkerPool::new(MyWorkerPoolImpl(), min_workers, max_workers, 1, worker_timeout);
607 let _worker_pool = worker_pool.clone();
608 rt.block_on(async move {
609 worker_pool.start();
610 let mut ths = Vec::new();
611 for i in 0..5 {
612 let _pool = worker_pool.clone();
613 ths.push(tokio::task::spawn(async move {
614 let (done_tx, done_rx) = mpsc::bounded_future_both(10);
615 for j in 0..2 {
616 let m = i * 10 + j;
617 println!("submit {} in {}/{}", m, j, i);
618 _pool.submit(MyMsg(m, done_tx.clone())).await;
619 }
620 for _j in 0..2 {
621 let _ = done_rx.recv().await;
623 }
624 }));
625 }
626 for th in ths {
627 let _ = th.await;
628 }
629 let workers = worker_pool.get_worker_count();
630 println!("cur workers {} might reach max {}", workers, max_workers);
631 sleep(worker_timeout * 2).await;
634 let workers = worker_pool.get_worker_count();
635 println!("cur workers: {}, extra should exit due to timeout", workers);
636 assert_eq!(workers, min_workers);
637
638 let (done_tx, done_rx) = mpsc::bounded_future_both(1);
639 for j in 0..10 {
640 worker_pool.submit(MyMsg(80 + j, done_tx.clone())).await;
641 let _ = done_rx.recv().await;
642 }
643 println!("closing");
644 _worker_pool.close();
645 sleep(Duration::from_secs(2)).await;
646 assert_eq!(_worker_pool.get_worker_count(), 0);
647 });
648 }
649
650 #[test]
651 fn bounded_workerpool_adjust_close() {
652 let min_workers = 1;
653 let max_workers = 4;
654 let worker_timeout = Duration::from_secs(1);
655 let rt = tokio::runtime::Builder::new_multi_thread()
656 .enable_all()
657 .worker_threads(2)
658 .build()
659 .unwrap();
660 let worker_pool =
661 MyWorkerPool::new(MyWorkerPoolImpl(), min_workers, max_workers, 1, worker_timeout);
662 let _worker_pool = worker_pool.clone();
663 rt.block_on(async move {
664 worker_pool.start();
665 let mut ths = Vec::new();
666 for i in 0..5 {
667 let _pool = worker_pool.clone();
668 ths.push(tokio::task::spawn(async move {
669 let (done_tx, done_rx) = mpsc::bounded_future_both(10);
670 for j in 0..2 {
671 let m = i * 10 + j;
672 println!("submit {} in {}/{}", m, j, i);
673 _pool.submit(MyMsg(m, done_tx.clone())).await;
674 }
675 for _j in 0..2 {
676 let _ = done_rx.recv().await;
678 }
679 }));
680 }
681 for th in ths {
682 let _ = th.await;
683 }
684 let workers = worker_pool.get_worker_count();
685 println!("cur workers {} might reach max {}", workers, max_workers);
686 sleep(worker_timeout * 2).await;
689 let workers = worker_pool.get_worker_count();
690 println!("cur workers: {}, extra should exit due to timeout", workers);
691 assert_eq!(workers, min_workers);
692
693 let (done_tx, done_rx) = mpsc::bounded_future_both(1);
694 for j in 0..10 {
695 worker_pool.submit(MyMsg(80 + j, done_tx.clone())).await;
696 let _ = done_rx.recv().await;
697 }
698 });
699 println!("closing");
700 _worker_pool.close();
701 assert_eq!(_worker_pool.get_worker_count(), 0);
702 }
703
704 #[allow(dead_code)]
705 struct MyBlockingWorkerPoolImpl();
706
707 struct MyBlockingWorker();
708
709 impl WorkerPoolImpl<MyMsg, MyBlockingWorker> for MyBlockingWorkerPoolImpl {
710 fn spawn(&self) -> MyBlockingWorker {
711 MyBlockingWorker()
712 }
713 }
714
715 #[async_trait]
716 impl Worker<MyMsg> for MyBlockingWorker {
717 async fn run(&mut self, msg: MyMsg) {
718 std::thread::sleep(Duration::from_millis(1));
719 println!("done {}", msg.0);
720 let _ = msg.1.send(()).await;
721 }
722 }
723
724 type MyBlockingWorkerPool =
725 WorkerPoolBounded<MyMsg, MyBlockingWorker, MyBlockingWorkerPoolImpl>;
726
727 #[test]
728 fn bounded_workerpool_adjust_real_thread() {
729 let min_workers = 1;
730 let max_workers = 4;
731 let worker_timeout = Duration::from_secs(1);
732 let rt = tokio::runtime::Builder::new_multi_thread()
733 .enable_all()
734 .worker_threads(2)
735 .build()
736 .unwrap();
737 let mut worker_pool = MyBlockingWorkerPool::new(
738 MyBlockingWorkerPoolImpl(),
739 min_workers,
740 max_workers,
741 1,
742 worker_timeout,
743 );
744 worker_pool.set_use_thread(true);
745 let _worker_pool = worker_pool.clone();
746 rt.block_on(async move {
747 worker_pool.start();
748 let mut ths = Vec::new();
749 for i in 0..5 {
750 let _pool = worker_pool.clone();
751 ths.push(tokio::task::spawn(async move {
752 let (done_tx, done_rx) = mpsc::bounded_future_both(10);
753 for j in 0..2 {
754 let m = i * 10 + j;
755 println!("submit {} in {}/{}", m, j, i);
756 _pool.submit(MyMsg(m, done_tx.clone())).await;
757 }
758 for _j in 0..2 {
759 let _ = done_rx.recv().await;
761 }
762 }));
763 }
764 for th in ths {
765 let _ = th.await;
766 }
767 let workers = worker_pool.get_worker_count();
768 println!("cur workers {} might reach max {}", workers, max_workers);
769 sleep(worker_timeout * 2).await;
772 let workers = worker_pool.get_worker_count();
773 println!("cur workers: {}, extra should exit due to timeout", workers);
774 assert_eq!(workers, min_workers);
775
776 let (done_tx, done_rx) = mpsc::bounded_future_both(1);
777 for j in 0..10 {
778 worker_pool.submit(MyMsg(80 + j, done_tx.clone())).await;
779 let _ = done_rx.recv().await;
780 }
781 });
782 println!("closing");
783 _worker_pool.close();
784 assert_eq!(_worker_pool.get_worker_count(), 0);
785 }
786}