sync_utils/worker_pool/
bounded.rs

1use std::{
2    cell::UnsafeCell,
3    future::Future,
4    mem::transmute,
5    pin::Pin,
6    sync::{
7        Arc,
8        atomic::{AtomicBool, AtomicUsize, Ordering},
9    },
10    task::*,
11    thread,
12    time::Duration,
13};
14
15use crossfire::{RecvError, SendError, TryRecvError, TrySendError, mpmc, mpsc};
16use tokio::time::{sleep, timeout};
17
18use super::*;
19
20#[allow(unused_must_use)]
21pub struct WorkerPoolBounded<
22    M: Send + Sized + Unpin + 'static,
23    W: Worker<M>,
24    S: WorkerPoolImpl<M, W>,
25>(Arc<WorkerPoolBoundedInner<M, W, S>>);
26
27struct WorkerPoolBoundedInner<M, W, S>
28where
29    M: Send + Sized + Unpin + 'static,
30    W: Worker<M>,
31    S: WorkerPoolImpl<M, W>,
32{
33    worker_count: AtomicUsize,
34    sender: UnsafeCell<Option<TxWraper<Option<M>>>>,
35    min_workers: usize,
36    max_workers: usize,
37    worker_timeout: Duration,
38    inner: S,
39    phantom: std::marker::PhantomData<W>, // to avoid complaining unused param
40    closing: AtomicBool,
41    notify_sender: mpsc::TxFuture<Option<()>, mpsc::SharedFutureBoth>,
42    notify_recv: UnsafeCell<Option<mpsc::RxFuture<Option<()>, mpsc::SharedFutureBoth>>>,
43    auto: bool,
44    channel_size: usize,
45    real_thread: AtomicBool,
46    bind_cpu: AtomicUsize,
47    max_cpu: usize,
48}
49
50unsafe impl<M, W, S> Send for WorkerPoolBoundedInner<M, W, S>
51where
52    M: Send + Sized + Unpin + 'static,
53    W: Worker<M>,
54    S: WorkerPoolImpl<M, W>,
55{
56}
57
58unsafe impl<M, W, S> Sync for WorkerPoolBoundedInner<M, W, S>
59where
60    M: Send + Sized + Unpin + 'static,
61    W: Worker<M>,
62    S: WorkerPoolImpl<M, W>,
63{
64}
65
66impl<M, W, S> Clone for WorkerPoolBounded<M, W, S>
67where
68    M: Send + Sized + Unpin + 'static,
69    W: Worker<M>,
70    S: WorkerPoolImpl<M, W>,
71{
72    #[inline]
73    fn clone(&self) -> Self {
74        Self(self.0.clone())
75    }
76}
77
78impl<M, W, S> WorkerPoolBounded<M, W, S>
79where
80    M: Send + Sized + Unpin + 'static,
81    W: Worker<M>,
82    S: WorkerPoolImpl<M, W>,
83{
84    pub fn new(
85        inner: S, min_workers: usize, max_workers: usize, channel_size: usize,
86        worker_timeout: Duration,
87    ) -> Self {
88        assert!(min_workers > 0);
89        assert!(max_workers >= min_workers);
90
91        let auto: bool = min_workers < max_workers;
92        if auto {
93            assert!(worker_timeout != ZERO_DUARTION);
94        }
95        //        if channel_size <= min_workers {
96        //            channel_size = min_workers + 1;
97        //        }
98        let (noti_sender, noti_recv) = mpsc::bounded_future_both(1);
99        let pool = Arc::new(WorkerPoolBoundedInner {
100            sender: UnsafeCell::new(None),
101            inner,
102            worker_count: AtomicUsize::new(0),
103            min_workers,
104            max_workers,
105            channel_size,
106            worker_timeout,
107            phantom: Default::default(),
108            closing: AtomicBool::new(false),
109            notify_sender: noti_sender,
110            notify_recv: UnsafeCell::new(Some(noti_recv)),
111            auto,
112            real_thread: AtomicBool::new(false),
113            bind_cpu: AtomicUsize::new(0),
114            max_cpu: num_cpus::get(),
115        });
116        Self(pool)
117    }
118
119    // If worker contains blocking logic, run worker in seperate threads
120    pub fn set_use_thread(&mut self, ok: bool) {
121        self.0.real_thread.store(ok, Ordering::Release);
122    }
123
124    pub fn start(&self) {
125        let _self = self.0.as_ref();
126        let rx;
127        if !_self.auto && _self.real_thread.load(Ordering::Acquire) {
128            let (sender, recv) = mpmc::bounded_tx_future_rx_blocking(_self.channel_size);
129            _self._sender().replace(TxWraper::Blocking(sender));
130            rx = RxWraper::Blocking(recv);
131        } else {
132            let (sender, recv) = mpmc::bounded_future_both(_self.channel_size);
133            _self._sender().replace(TxWraper::Async(sender));
134            rx = RxWraper::Async(recv);
135        }
136
137        for _ in 0.._self.min_workers {
138            self.0.clone().spawn(true, rx.clone());
139        }
140        if _self.auto {
141            let _pool = self.0.clone();
142            let notify_recv: &mut Option<mpsc::RxFuture<Option<()>, mpsc::SharedFutureBoth>> =
143                unsafe { transmute(_self.notify_recv.get()) };
144            let noti_rx = notify_recv.take().unwrap();
145            tokio::spawn(async move {
146                _pool.monitor(noti_rx, rx).await;
147            });
148        }
149    }
150
151    pub async fn close_async(&self) {
152        let _self = self.0.as_ref();
153        if _self.closing.swap(true, Ordering::SeqCst) {
154            return;
155        }
156        if _self.auto {
157            let _ = _self.notify_sender.send(None).await;
158        }
159        let sender = _self._sender().as_ref().unwrap();
160        loop {
161            let cur = self.get_worker_count();
162            if cur == 0 {
163                break;
164            }
165            debug!("worker pool closing: cur workers {}", cur);
166            for _ in 0..cur {
167                let _ = sender.send(None).await;
168            }
169            sleep(Duration::from_secs(1)).await;
170        }
171    }
172
173    // must not use in runtime
174    pub fn close(&self) {
175        if let Ok(_rt) = tokio::runtime::Handle::try_current() {
176            warn!("close in runtime thread, spawn close thread");
177            let _self = self.clone();
178            std::thread::spawn(move || {
179                let rt = tokio::runtime::Builder::new_current_thread()
180                    .enable_all()
181                    .build()
182                    .expect("runtime");
183                rt.block_on(async move {
184                    _self.close_async().await;
185                });
186            });
187        } else {
188            let rt = tokio::runtime::Builder::new_current_thread()
189                .enable_all()
190                .build()
191                .expect("runtime");
192            let _self = self.clone();
193            rt.block_on(async move {
194                _self.close_async().await;
195            });
196        }
197    }
198
199    pub fn get_worker_count(&self) -> usize {
200        self.0.get_worker_count()
201    }
202
203    pub fn get_inner(&self) -> &S {
204        &self.0.inner
205    }
206
207    #[inline]
208    pub fn try_submit(&self, msg: M) -> Option<M> {
209        let _self = self.0.as_ref();
210        if _self.closing.load(Ordering::Acquire) {
211            return Some(msg);
212        }
213        match _self._sender().as_ref().unwrap().try_send(Some(msg)) {
214            Err(TrySendError::Disconnected(m)) => {
215                return m;
216            }
217            Err(TrySendError::Full(m)) => {
218                return m;
219            }
220            Ok(_) => return None,
221        }
222    }
223
224    // return non-null on send fail
225    #[inline]
226    pub fn submit<'a>(&'a self, mut msg: M) -> SubmitFuture<'a, M> {
227        let _self = self.0.as_ref();
228        if _self.closing.load(Ordering::Acquire) {
229            return SubmitFuture { send_f: None, res: Some(Err(msg)) };
230        }
231        let sender = _self._sender().as_ref().unwrap();
232        if _self.auto {
233            match sender.try_send(Some(msg)) {
234                Err(TrySendError::Disconnected(m)) => {
235                    return SubmitFuture { send_f: None, res: Some(Err(m.unwrap())) };
236                }
237                Err(TrySendError::Full(m)) => {
238                    msg = m.unwrap();
239                }
240                Ok(_) => {
241                    return SubmitFuture { send_f: None, res: Some(Ok(())) };
242                }
243            }
244            let worker_count = _self.get_worker_count();
245            if worker_count < _self.max_workers {
246                let _ = _self.notify_sender.try_send(Some(()));
247            }
248        }
249        let send_f = sender.send(Some(msg));
250        return SubmitFuture { send_f: Some(send_f), res: None };
251    }
252}
253
254pub struct SubmitFuture<'a, M: Send + Sized + Unpin + 'static> {
255    send_f: Option<SendFutureWarper<'a, Option<M>>>,
256    res: Option<Result<(), M>>,
257}
258
259impl<'a, M: Send + Sized + Unpin + 'static> Future for SubmitFuture<'a, M> {
260    type Output = Option<M>;
261
262    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
263        let _self = self.get_mut();
264        if _self.res.is_some() {
265            match _self.res.take().unwrap() {
266                Ok(()) => return Poll::Ready(None),
267                Err(m) => return Poll::Ready(Some(m)),
268            }
269        }
270        let send_f = _self.send_f.as_mut().unwrap();
271        if let Poll::Ready(r) = Pin::new(send_f).poll(ctx) {
272            match r {
273                Ok(()) => return Poll::Ready(None),
274                Err(SendError(e)) => {
275                    return Poll::Ready(e);
276                }
277            }
278        }
279        Poll::Pending
280    }
281}
282
283#[async_trait]
284impl<M, W, S> WorkerPoolAsyncInf<M> for WorkerPoolBounded<M, W, S>
285where
286    M: Send + Sized + Unpin + 'static,
287    W: Worker<M>,
288    S: WorkerPoolImpl<M, W>,
289{
290    #[inline]
291    async fn submit(&self, msg: M) -> Option<M> {
292        self.submit(msg).await
293    }
294
295    #[inline]
296    fn try_submit(&self, msg: M) -> Option<M> {
297        self.try_submit(msg)
298    }
299}
300
301impl<M, W, S> WorkerPoolBoundedInner<M, W, S>
302where
303    M: Send + Sized + Unpin + 'static,
304    W: Worker<M>,
305    S: WorkerPoolImpl<M, W>,
306{
307    #[inline(always)]
308    fn _sender(&self) -> &mut Option<TxWraper<Option<M>>> {
309        unsafe { transmute(self.sender.get()) }
310    }
311
312    async fn run_worker_simple(&self, mut worker: W, rx: RxWraper<Option<M>>) {
313        if let Err(_) = worker.init().await {
314            let _ = self.try_exit();
315            worker.on_exit();
316            return;
317        }
318        loop {
319            match rx.recv().await {
320                Ok(item) => {
321                    if item.is_none() {
322                        let _ = self.try_exit();
323                        break;
324                    }
325                    worker.run(item.unwrap()).await;
326                }
327                Err(_) => {
328                    // channel closed worker exit
329                    let _ = self.try_exit();
330                    break;
331                }
332            }
333        }
334        worker.on_exit();
335    }
336
337    async fn run_worker_adjust(&self, mut worker: W, rx: RxWraper<Option<M>>) {
338        if let Err(_) = worker.init().await {
339            let _ = self.try_exit();
340            worker.on_exit();
341            return;
342        }
343
344        let worker_timeout = self.worker_timeout;
345        let mut is_idle = false;
346        'WORKER_LOOP: loop {
347            if is_idle {
348                match timeout(worker_timeout, rx.recv()).await {
349                    Ok(res) => {
350                        match res {
351                            Ok(item) => {
352                                if item.is_none() {
353                                    let _ = self.try_exit();
354                                    break 'WORKER_LOOP;
355                                }
356                                worker.run(item.unwrap()).await;
357                                is_idle = false;
358                            }
359                            Err(_) => {
360                                // channel closed worker exit
361                                let _ = self.try_exit();
362                                worker.on_exit();
363                            }
364                        }
365                    }
366                    Err(_) => {
367                        // timeout
368                        if self.try_exit() {
369                            break 'WORKER_LOOP;
370                        }
371                    }
372                }
373            } else {
374                match rx.try_recv() {
375                    Err(e) => {
376                        if e.is_empty() {
377                            is_idle = true;
378                        } else {
379                            let _ = self.try_exit();
380                            break 'WORKER_LOOP;
381                        }
382                    }
383                    Ok(Some(item)) => {
384                        worker.run(item).await;
385                        is_idle = false;
386                    }
387                    Ok(None) => {
388                        let _ = self.try_exit();
389                        break 'WORKER_LOOP;
390                    }
391                }
392            }
393        }
394        worker.on_exit();
395    }
396
397    #[inline(always)]
398    pub fn get_worker_count(&self) -> usize {
399        self.worker_count.load(Ordering::Acquire)
400    }
401
402    #[inline(always)]
403    fn spawn(self: Arc<Self>, initial: bool, rx: RxWraper<Option<M>>) {
404        self.worker_count.fetch_add(1, Ordering::SeqCst);
405        let worker = self.inner.spawn();
406        let _self = self.clone();
407        if self.real_thread.load(Ordering::Acquire) {
408            let mut bind_cpu: Option<usize> = None;
409            if _self.bind_cpu.load(Ordering::Acquire) <= _self.max_cpu {
410                let cpu = _self.bind_cpu.fetch_add(1, Ordering::SeqCst);
411                if cpu < _self.max_cpu {
412                    bind_cpu = Some(cpu as usize);
413                }
414            }
415            thread::spawn(move || {
416                if let Some(cpu) = bind_cpu {
417                    core_affinity::set_for_current(core_affinity::CoreId { id: cpu });
418                }
419                let rt = tokio::runtime::Builder::new_current_thread()
420                    .enable_all()
421                    .build()
422                    .expect("runtime");
423                rt.block_on(async move {
424                    if initial || !_self.auto {
425                        _self.run_worker_simple(worker, rx).await
426                    } else {
427                        _self.run_worker_adjust(worker, rx).await
428                    }
429                });
430            });
431        } else {
432            tokio::spawn(async move {
433                if initial || !_self.auto {
434                    _self.run_worker_simple(worker, rx).await
435                } else {
436                    _self.run_worker_adjust(worker, rx).await
437                }
438            });
439        }
440    }
441
442    // check if idle worker should exit
443    #[inline(always)]
444    fn try_exit(&self) -> bool {
445        if self.closing.load(Ordering::Acquire) {
446            self.worker_count.fetch_sub(1, Ordering::SeqCst);
447            return true;
448        }
449        if self.get_worker_count() > self.min_workers {
450            if self.worker_count.fetch_sub(1, Ordering::SeqCst) <= self.min_workers {
451                self.worker_count.fetch_add(1, Ordering::SeqCst); // rollback
452            } else {
453                return true; // worker exit
454            }
455        }
456        return false;
457    }
458
459    async fn monitor(
460        self: Arc<Self>, noti: mpsc::RxFuture<Option<()>, mpsc::SharedFutureBoth>,
461        rx: RxWraper<Option<M>>,
462    ) {
463        let _self = self.as_ref();
464        loop {
465            match timeout(Duration::from_secs(1), noti.recv()).await {
466                Err(_) => {
467                    if _self.closing.load(Ordering::Acquire) {
468                        return;
469                    }
470                    continue;
471                }
472                Ok(Ok(Some(_))) => {
473                    if _self.closing.load(Ordering::Acquire) {
474                        return;
475                    }
476                    let worker_count = _self.get_worker_count();
477                    if worker_count > _self.max_workers {
478                        continue;
479                    }
480                    self.clone().spawn(false, rx.clone());
481                }
482                _ => {
483                    println!("monitor exit");
484                    return;
485                }
486            }
487        }
488    }
489}
490
491enum TxWraper<M: Send + Sized + Unpin + 'static> {
492    Blocking(mpmc::TxFuture<M, mpmc::SharedSenderFRecvB>),
493    Async(mpmc::TxFuture<M, mpmc::SharedFutureBoth>),
494}
495
496enum RxWraper<M: Send + Sized + Unpin + 'static> {
497    Blocking(mpmc::RxBlocking<M, mpmc::SharedSenderFRecvB>),
498    Async(mpmc::RxFuture<M, mpmc::SharedFutureBoth>),
499}
500
501pub enum SendFutureWarper<'a, M: Send + Sized + Unpin + 'static> {
502    Blocking(mpmc::SendFuture<'a, M, mpmc::SharedSenderFRecvB>),
503    Async(mpmc::SendFuture<'a, M, mpmc::SharedFutureBoth>),
504}
505
506impl<'a, M: Send + Sized + Unpin + 'static> Future for SendFutureWarper<'a, M> {
507    type Output = Result<(), SendError<M>>;
508
509    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
510        let _self = self.get_mut();
511        match _self {
512            Self::Blocking(s) => Pin::new(s).poll(ctx),
513            Self::Async(s) => Pin::new(s).poll(ctx),
514        }
515    }
516}
517
518impl<M: Send + Sized + Unpin + 'static> TxWraper<M> {
519    #[inline(always)]
520    fn try_send(&self, m: M) -> Result<(), TrySendError<M>> {
521        match self {
522            Self::Blocking(s) => s.try_send(m),
523            Self::Async(s) => s.try_send(m),
524        }
525    }
526
527    #[inline(always)]
528    fn send<'a>(&'a self, m: M) -> SendFutureWarper<'a, M> {
529        match self {
530            Self::Blocking(s) => SendFutureWarper::Blocking(s.make_send_future(m)),
531            Self::Async(s) => SendFutureWarper::Async(s.make_send_future(m)),
532        }
533    }
534}
535
536impl<M: Send + Sized + Unpin + 'static> RxWraper<M> {
537    #[inline(always)]
538    async fn recv(&self) -> Result<M, RecvError> {
539        match self {
540            Self::Blocking(s) => s.recv(),
541            Self::Async(s) => s.recv().await,
542        }
543    }
544
545    #[inline(always)]
546    fn try_recv(&self) -> Result<M, TryRecvError> {
547        match self {
548            Self::Blocking(s) => s.try_recv(),
549            Self::Async(s) => s.try_recv(),
550        }
551    }
552}
553
554impl<M: Send + Sized + Unpin + 'static> Clone for RxWraper<M> {
555    fn clone(&self) -> Self {
556        match self {
557            Self::Blocking(s) => Self::Blocking(s.clone()),
558            Self::Async(s) => Self::Async(s.clone()),
559        }
560    }
561}
562
563#[cfg(test)]
564mod tests {
565
566    use crossfire::mpsc;
567    use tokio::time::{Duration, sleep};
568
569    use super::*;
570
571    #[allow(dead_code)]
572    struct MyWorkerPoolImpl();
573
574    struct MyWorker();
575
576    struct MyMsg(i64, mpsc::TxFuture<(), mpsc::SharedFutureBoth>);
577
578    impl WorkerPoolImpl<MyMsg, MyWorker> for MyWorkerPoolImpl {
579        fn spawn(&self) -> MyWorker {
580            MyWorker()
581        }
582    }
583
584    #[async_trait]
585    impl Worker<MyMsg> for MyWorker {
586        async fn run(&mut self, msg: MyMsg) {
587            sleep(Duration::from_millis(1)).await;
588            println!("done {}", msg.0);
589            let _ = msg.1.send(()).await;
590        }
591    }
592
593    type MyWorkerPool = WorkerPoolBounded<MyMsg, MyWorker, MyWorkerPoolImpl>;
594
595    #[test]
596    fn bounded_workerpool_adjust_close_async() {
597        let min_workers = 1;
598        let max_workers = 4;
599        let worker_timeout = Duration::from_secs(1);
600        let rt = tokio::runtime::Builder::new_multi_thread()
601            .enable_all()
602            .worker_threads(2)
603            .build()
604            .unwrap();
605        let worker_pool =
606            MyWorkerPool::new(MyWorkerPoolImpl(), min_workers, max_workers, 1, worker_timeout);
607        let _worker_pool = worker_pool.clone();
608        rt.block_on(async move {
609            worker_pool.start();
610            let mut ths = Vec::new();
611            for i in 0..5 {
612                let _pool = worker_pool.clone();
613                ths.push(tokio::task::spawn(async move {
614                    let (done_tx, done_rx) = mpsc::bounded_future_both(10);
615                    for j in 0..2 {
616                        let m = i * 10 + j;
617                        println!("submit {} in {}/{}", m, j, i);
618                        _pool.submit(MyMsg(m, done_tx.clone())).await;
619                    }
620                    for _j in 0..2 {
621                        //println!("sender {} recv {}", i, _j);
622                        let _ = done_rx.recv().await;
623                    }
624                }));
625            }
626            for th in ths {
627                let _ = th.await;
628            }
629            let workers = worker_pool.get_worker_count();
630            println!("cur workers {} might reach max {}", workers, max_workers);
631            //assert_eq!(workers, max_workers);
632
633            sleep(worker_timeout * 2).await;
634            let workers = worker_pool.get_worker_count();
635            println!("cur workers: {}, extra should exit due to timeout", workers);
636            assert_eq!(workers, min_workers);
637
638            let (done_tx, done_rx) = mpsc::bounded_future_both(1);
639            for j in 0..10 {
640                worker_pool.submit(MyMsg(80 + j, done_tx.clone())).await;
641                let _ = done_rx.recv().await;
642            }
643            println!("closing");
644            _worker_pool.close();
645            sleep(Duration::from_secs(2)).await;
646            assert_eq!(_worker_pool.get_worker_count(), 0);
647        });
648    }
649
650    #[test]
651    fn bounded_workerpool_adjust_close() {
652        let min_workers = 1;
653        let max_workers = 4;
654        let worker_timeout = Duration::from_secs(1);
655        let rt = tokio::runtime::Builder::new_multi_thread()
656            .enable_all()
657            .worker_threads(2)
658            .build()
659            .unwrap();
660        let worker_pool =
661            MyWorkerPool::new(MyWorkerPoolImpl(), min_workers, max_workers, 1, worker_timeout);
662        let _worker_pool = worker_pool.clone();
663        rt.block_on(async move {
664            worker_pool.start();
665            let mut ths = Vec::new();
666            for i in 0..5 {
667                let _pool = worker_pool.clone();
668                ths.push(tokio::task::spawn(async move {
669                    let (done_tx, done_rx) = mpsc::bounded_future_both(10);
670                    for j in 0..2 {
671                        let m = i * 10 + j;
672                        println!("submit {} in {}/{}", m, j, i);
673                        _pool.submit(MyMsg(m, done_tx.clone())).await;
674                    }
675                    for _j in 0..2 {
676                        //println!("sender {} recv {}", i, _j);
677                        let _ = done_rx.recv().await;
678                    }
679                }));
680            }
681            for th in ths {
682                let _ = th.await;
683            }
684            let workers = worker_pool.get_worker_count();
685            println!("cur workers {} might reach max {}", workers, max_workers);
686            //assert_eq!(workers, max_workers);
687
688            sleep(worker_timeout * 2).await;
689            let workers = worker_pool.get_worker_count();
690            println!("cur workers: {}, extra should exit due to timeout", workers);
691            assert_eq!(workers, min_workers);
692
693            let (done_tx, done_rx) = mpsc::bounded_future_both(1);
694            for j in 0..10 {
695                worker_pool.submit(MyMsg(80 + j, done_tx.clone())).await;
696                let _ = done_rx.recv().await;
697            }
698        });
699        println!("closing");
700        _worker_pool.close();
701        assert_eq!(_worker_pool.get_worker_count(), 0);
702    }
703
704    #[allow(dead_code)]
705    struct MyBlockingWorkerPoolImpl();
706
707    struct MyBlockingWorker();
708
709    impl WorkerPoolImpl<MyMsg, MyBlockingWorker> for MyBlockingWorkerPoolImpl {
710        fn spawn(&self) -> MyBlockingWorker {
711            MyBlockingWorker()
712        }
713    }
714
715    #[async_trait]
716    impl Worker<MyMsg> for MyBlockingWorker {
717        async fn run(&mut self, msg: MyMsg) {
718            std::thread::sleep(Duration::from_millis(1));
719            println!("done {}", msg.0);
720            let _ = msg.1.send(()).await;
721        }
722    }
723
724    type MyBlockingWorkerPool =
725        WorkerPoolBounded<MyMsg, MyBlockingWorker, MyBlockingWorkerPoolImpl>;
726
727    #[test]
728    fn bounded_workerpool_adjust_real_thread() {
729        let min_workers = 1;
730        let max_workers = 4;
731        let worker_timeout = Duration::from_secs(1);
732        let rt = tokio::runtime::Builder::new_multi_thread()
733            .enable_all()
734            .worker_threads(2)
735            .build()
736            .unwrap();
737        let mut worker_pool = MyBlockingWorkerPool::new(
738            MyBlockingWorkerPoolImpl(),
739            min_workers,
740            max_workers,
741            1,
742            worker_timeout,
743        );
744        worker_pool.set_use_thread(true);
745        let _worker_pool = worker_pool.clone();
746        rt.block_on(async move {
747            worker_pool.start();
748            let mut ths = Vec::new();
749            for i in 0..5 {
750                let _pool = worker_pool.clone();
751                ths.push(tokio::task::spawn(async move {
752                    let (done_tx, done_rx) = mpsc::bounded_future_both(10);
753                    for j in 0..2 {
754                        let m = i * 10 + j;
755                        println!("submit {} in {}/{}", m, j, i);
756                        _pool.submit(MyMsg(m, done_tx.clone())).await;
757                    }
758                    for _j in 0..2 {
759                        //println!("sender {} recv {}", i, _j);
760                        let _ = done_rx.recv().await;
761                    }
762                }));
763            }
764            for th in ths {
765                let _ = th.await;
766            }
767            let workers = worker_pool.get_worker_count();
768            println!("cur workers {} might reach max {}", workers, max_workers);
769            //assert_eq!(workers, max_workers);
770
771            sleep(worker_timeout * 2).await;
772            let workers = worker_pool.get_worker_count();
773            println!("cur workers: {}, extra should exit due to timeout", workers);
774            assert_eq!(workers, min_workers);
775
776            let (done_tx, done_rx) = mpsc::bounded_future_both(1);
777            for j in 0..10 {
778                worker_pool.submit(MyMsg(80 + j, done_tx.clone())).await;
779                let _ = done_rx.recv().await;
780            }
781        });
782        println!("closing");
783        _worker_pool.close();
784        assert_eq!(_worker_pool.get_worker_count(), 0);
785    }
786}