sync_utils/worker_pool/
bounded.rs

1use std::{
2    cell::UnsafeCell,
3    future::Future,
4    mem::transmute,
5    pin::Pin,
6    sync::{
7        Arc,
8        atomic::{AtomicBool, AtomicUsize, Ordering},
9    },
10    task::*,
11    thread,
12    time::Duration,
13};
14
15use crossfire::{mpmc, mpsc};
16use tokio::time::{sleep, timeout};
17
18use super::*;
19
20#[allow(unused_must_use)]
21pub struct WorkerPoolBounded<
22    M: Send + Sized + Unpin + 'static,
23    W: Worker<M>,
24    S: WorkerPoolImpl<M, W>,
25>(Arc<WorkerPoolBoundedInner<M, W, S>>);
26
27struct WorkerPoolBoundedInner<M, W, S>
28where
29    M: Send + Sized + Unpin + 'static,
30    W: Worker<M>,
31    S: WorkerPoolImpl<M, W>,
32{
33    worker_count: AtomicUsize,
34    sender: UnsafeCell<Option<TxWraper<Option<M>>>>,
35    min_workers: usize,
36    max_workers: usize,
37    worker_timeout: Duration,
38    inner: S,
39    phantom: std::marker::PhantomData<W>, // to avoid complaining unused param
40    closing: AtomicBool,
41    notify_sender: mpsc::TxFuture<Option<()>, mpsc::SharedFutureBoth>,
42    notify_recv: UnsafeCell<Option<mpsc::RxFuture<Option<()>, mpsc::SharedFutureBoth>>>,
43    auto: bool,
44    channel_size: usize,
45    real_thread: AtomicBool,
46    bind_cpu: AtomicUsize,
47    max_cpu: usize,
48}
49
50unsafe impl<M, W, S> Send for WorkerPoolBoundedInner<M, W, S>
51where
52    M: Send + Sized + Unpin + 'static,
53    W: Worker<M>,
54    S: WorkerPoolImpl<M, W>,
55{
56}
57
58unsafe impl<M, W, S> Sync for WorkerPoolBoundedInner<M, W, S>
59where
60    M: Send + Sized + Unpin + 'static,
61    W: Worker<M>,
62    S: WorkerPoolImpl<M, W>,
63{
64}
65
66impl<M, W, S> Clone for WorkerPoolBounded<M, W, S>
67where
68    M: Send + Sized + Unpin + 'static,
69    W: Worker<M>,
70    S: WorkerPoolImpl<M, W>,
71{
72    #[inline]
73    fn clone(&self) -> Self {
74        Self(self.0.clone())
75    }
76}
77
78impl<M, W, S> WorkerPoolBounded<M, W, S>
79where
80    M: Send + Sized + Unpin + 'static,
81    W: Worker<M>,
82    S: WorkerPoolImpl<M, W>,
83{
84    pub fn new(
85        inner: S,
86        min_workers: usize,
87        max_workers: usize,
88        channel_size: usize,
89        worker_timeout: Duration,
90    ) -> Self {
91        assert!(min_workers > 0);
92        assert!(max_workers >= min_workers);
93
94        let auto: bool = min_workers < max_workers;
95        if auto {
96            assert!(worker_timeout != ZERO_DUARTION);
97        }
98        //        if channel_size <= min_workers {
99        //            channel_size = min_workers + 1;
100        //        }
101        let (noti_sender, noti_recv) = mpsc::bounded_future_both(1);
102        let pool = Arc::new(WorkerPoolBoundedInner {
103            sender: UnsafeCell::new(None),
104            inner,
105            worker_count: AtomicUsize::new(0),
106            min_workers,
107            max_workers,
108            channel_size,
109            worker_timeout,
110            phantom: Default::default(),
111            closing: AtomicBool::new(false),
112            notify_sender: noti_sender,
113            notify_recv: UnsafeCell::new(Some(noti_recv)),
114            auto,
115            real_thread: AtomicBool::new(false),
116            bind_cpu: AtomicUsize::new(0),
117            max_cpu: num_cpus::get(),
118        });
119        Self(pool)
120    }
121
122    // If worker contains blocking logic, run worker in seperate threads
123    pub fn set_use_thread(&mut self, ok: bool) {
124        self.0.real_thread.store(ok, Ordering::Release);
125    }
126
127    pub fn start(&self) {
128        let _self = self.0.as_ref();
129        let rx;
130        if !_self.auto && _self.real_thread.load(Ordering::Acquire) {
131            let (sender, recv) = mpmc::bounded_tx_future_rx_blocking(_self.channel_size);
132            _self._sender().replace(TxWraper::Blocking(sender));
133            rx = RxWraper::Blocking(recv);
134        } else {
135            let (sender, recv) = mpmc::bounded_future_both(_self.channel_size);
136            _self._sender().replace(TxWraper::Async(sender));
137            rx = RxWraper::Async(recv);
138        }
139
140        for _ in 0.._self.min_workers {
141            self.0.clone().spawn(true, rx.clone());
142        }
143        if _self.auto {
144            let _pool = self.0.clone();
145            let notify_recv: &mut Option<mpsc::RxFuture<Option<()>, mpsc::SharedFutureBoth>> =
146                unsafe { transmute(_self.notify_recv.get()) };
147            let noti_rx = notify_recv.take().unwrap();
148            tokio::spawn(async move {
149                _pool.monitor(noti_rx, rx).await;
150            });
151        }
152    }
153
154    pub async fn close_async(&self) {
155        let _self = self.0.as_ref();
156        if _self.closing.swap(true, Ordering::SeqCst) {
157            return;
158        }
159        if _self.auto {
160            let _ = _self.notify_sender.send(None).await;
161        }
162        let sender = _self._sender().as_ref().unwrap();
163        loop {
164            let cur = self.get_worker_count();
165            if cur == 0 {
166                break;
167            }
168            debug!("worker pool closing: cur workers {}", cur);
169            for _ in 0..cur {
170                let _ = sender.send(None).await;
171            }
172            sleep(Duration::from_secs(1)).await;
173        }
174    }
175
176    // must not use in runtime
177    pub fn close(&self) {
178        if let Ok(_rt) = tokio::runtime::Handle::try_current() {
179            warn!("close in runtime thread, spawn close thread");
180            let _self = self.clone();
181            std::thread::spawn(move || {
182                let rt = tokio::runtime::Builder::new_current_thread()
183                    .enable_all()
184                    .build()
185                    .expect("runtime");
186                rt.block_on(async move {
187                    _self.close_async().await;
188                });
189            });
190        } else {
191            let rt = tokio::runtime::Builder::new_current_thread()
192                .enable_all()
193                .build()
194                .expect("runtime");
195            let _self = self.clone();
196            rt.block_on(async move {
197                _self.close_async().await;
198            });
199        }
200    }
201
202    pub fn get_worker_count(&self) -> usize {
203        self.0.get_worker_count()
204    }
205
206    pub fn get_inner(&self) -> &S {
207        &self.0.inner
208    }
209
210    #[inline]
211    pub fn try_submit(&self, msg: M) -> Option<M> {
212        let _self = self.0.as_ref();
213        if _self.closing.load(Ordering::Acquire) {
214            return Some(msg);
215        }
216        match _self._sender().as_ref().unwrap().try_send(Some(msg)) {
217            Err(mpmc::TrySendError::Disconnected(m)) => {
218                return m;
219            }
220            Err(mpmc::TrySendError::Full(m)) => {
221                return m;
222            }
223            Ok(_) => return None,
224        }
225    }
226
227    // return non-null on send fail
228    #[inline]
229    pub fn submit<'a>(&'a self, mut msg: M) -> SubmitFuture<'a, M> {
230        let _self = self.0.as_ref();
231        if _self.closing.load(Ordering::Acquire) {
232            return SubmitFuture {
233                send_f: None,
234                res: Some(Err(msg)),
235            };
236        }
237        let sender = _self._sender().as_ref().unwrap();
238        if _self.auto {
239            match sender.try_send(Some(msg)) {
240                Err(mpmc::TrySendError::Disconnected(m)) => {
241                    return SubmitFuture {
242                        send_f: None,
243                        res: Some(Err(m.unwrap())),
244                    };
245                }
246                Err(mpmc::TrySendError::Full(m)) => {
247                    msg = m.unwrap();
248                }
249                Ok(_) => {
250                    return SubmitFuture {
251                        send_f: None,
252                        res: Some(Ok(())),
253                    };
254                }
255            }
256            let worker_count = _self.get_worker_count();
257            if worker_count < _self.max_workers {
258                let _ = _self.notify_sender.try_send(Some(()));
259            }
260        }
261        let send_f = sender.send(Some(msg));
262        return SubmitFuture {
263            send_f: Some(send_f),
264            res: None,
265        };
266    }
267}
268
269pub struct SubmitFuture<'a, M: Send + Sized + Unpin + 'static> {
270    send_f: Option<SendFutureWarper<'a, Option<M>>>,
271    res: Option<Result<(), M>>,
272}
273
274impl<'a, M: Send + Sized + Unpin + 'static> Future for SubmitFuture<'a, M> {
275    type Output = Option<M>;
276
277    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
278        let _self = self.get_mut();
279        if _self.res.is_some() {
280            match _self.res.take().unwrap() {
281                Ok(()) => return Poll::Ready(None),
282                Err(m) => return Poll::Ready(Some(m)),
283            }
284        }
285        let send_f = _self.send_f.as_mut().unwrap();
286        if let Poll::Ready(r) = Pin::new(send_f).poll(ctx) {
287            match r {
288                Ok(()) => return Poll::Ready(None),
289                Err(mpmc::SendError(e)) => {
290                    return Poll::Ready(e);
291                }
292            }
293        }
294        Poll::Pending
295    }
296}
297
298#[async_trait]
299impl<M, W, S> WorkerPoolAsyncInf<M> for WorkerPoolBounded<M, W, S>
300where
301    M: Send + Sized + Unpin + 'static,
302    W: Worker<M>,
303    S: WorkerPoolImpl<M, W>,
304{
305    #[inline]
306    async fn submit(&self, msg: M) -> Option<M> {
307        self.submit(msg).await
308    }
309
310    #[inline]
311    fn try_submit(&self, msg: M) -> Option<M> {
312        self.try_submit(msg)
313    }
314}
315
316impl<M, W, S> WorkerPoolBoundedInner<M, W, S>
317where
318    M: Send + Sized + Unpin + 'static,
319    W: Worker<M>,
320    S: WorkerPoolImpl<M, W>,
321{
322    #[inline(always)]
323    fn _sender(&self) -> &mut Option<TxWraper<Option<M>>> {
324        unsafe { transmute(self.sender.get()) }
325    }
326
327    async fn run_worker_simple(&self, mut worker: W, rx: RxWraper<Option<M>>) {
328        if let Err(_) = worker.init().await {
329            let _ = self.try_exit();
330            worker.on_exit();
331            return;
332        }
333        loop {
334            match rx.recv().await {
335                Ok(item) => {
336                    if item.is_none() {
337                        let _ = self.try_exit();
338                        break;
339                    }
340                    worker.run(item.unwrap()).await;
341                }
342                Err(_) => {
343                    // channel closed worker exit
344                    let _ = self.try_exit();
345                    break;
346                }
347            }
348        }
349        worker.on_exit();
350    }
351
352    async fn run_worker_adjust(&self, mut worker: W, rx: RxWraper<Option<M>>) {
353        if let Err(_) = worker.init().await {
354            let _ = self.try_exit();
355            worker.on_exit();
356            return;
357        }
358
359        let worker_timeout = self.worker_timeout;
360        let mut is_idle = false;
361        'WORKER_LOOP: loop {
362            if is_idle {
363                match timeout(worker_timeout, rx.recv()).await {
364                    Ok(res) => {
365                        match res {
366                            Ok(item) => {
367                                if item.is_none() {
368                                    let _ = self.try_exit();
369                                    break 'WORKER_LOOP;
370                                }
371                                worker.run(item.unwrap()).await;
372                                is_idle = false;
373                            }
374                            Err(_) => {
375                                // channel closed worker exit
376                                let _ = self.try_exit();
377                                worker.on_exit();
378                            }
379                        }
380                    }
381                    Err(_) => {
382                        // timeout
383                        if self.try_exit() {
384                            break 'WORKER_LOOP;
385                        }
386                    }
387                }
388            } else {
389                match rx.try_recv() {
390                    Err(e) => {
391                        if e.is_empty() {
392                            is_idle = true;
393                        } else {
394                            let _ = self.try_exit();
395                            break 'WORKER_LOOP;
396                        }
397                    }
398                    Ok(Some(item)) => {
399                        worker.run(item).await;
400                        is_idle = false;
401                    }
402                    Ok(None) => {
403                        let _ = self.try_exit();
404                        break 'WORKER_LOOP;
405                    }
406                }
407            }
408        }
409        worker.on_exit();
410    }
411
412    #[inline(always)]
413    pub fn get_worker_count(&self) -> usize {
414        self.worker_count.load(Ordering::Acquire)
415    }
416
417    #[inline(always)]
418    fn spawn(self: Arc<Self>, initial: bool, rx: RxWraper<Option<M>>) {
419        self.worker_count.fetch_add(1, Ordering::SeqCst);
420        let worker = self.inner.spawn();
421        let _self = self.clone();
422        if self.real_thread.load(Ordering::Acquire) {
423            let mut bind_cpu: Option<usize> = None;
424            if _self.bind_cpu.load(Ordering::Acquire) <= _self.max_cpu {
425                let cpu = _self.bind_cpu.fetch_add(1, Ordering::SeqCst);
426                if cpu < _self.max_cpu {
427                    bind_cpu = Some(cpu as usize);
428                }
429            }
430            thread::spawn(move || {
431                if let Some(cpu) = bind_cpu {
432                    core_affinity::set_for_current(core_affinity::CoreId { id: cpu });
433                }
434                let rt = tokio::runtime::Builder::new_current_thread()
435                    .enable_all()
436                    .build()
437                    .expect("runtime");
438                rt.block_on(async move {
439                    if initial || !_self.auto {
440                        _self.run_worker_simple(worker, rx).await
441                    } else {
442                        _self.run_worker_adjust(worker, rx).await
443                    }
444                });
445            });
446        } else {
447            tokio::spawn(async move {
448                if initial || !_self.auto {
449                    _self.run_worker_simple(worker, rx).await
450                } else {
451                    _self.run_worker_adjust(worker, rx).await
452                }
453            });
454        }
455    }
456
457    // check if idle worker should exit
458    #[inline(always)]
459    fn try_exit(&self) -> bool {
460        if self.closing.load(Ordering::Acquire) {
461            self.worker_count.fetch_sub(1, Ordering::SeqCst);
462            return true;
463        }
464        if self.get_worker_count() > self.min_workers {
465            if self.worker_count.fetch_sub(1, Ordering::SeqCst) <= self.min_workers {
466                self.worker_count.fetch_add(1, Ordering::SeqCst); // rollback
467            } else {
468                return true; // worker exit
469            }
470        }
471        return false;
472    }
473
474    async fn monitor(
475        self: Arc<Self>,
476        noti: mpsc::RxFuture<Option<()>, mpsc::SharedFutureBoth>,
477        rx: RxWraper<Option<M>>,
478    ) {
479        let _self = self.as_ref();
480        loop {
481            match timeout(Duration::from_secs(1), noti.recv()).await {
482                Err(_) => {
483                    if _self.closing.load(Ordering::Acquire) {
484                        return;
485                    }
486                    continue;
487                }
488                Ok(Ok(Some(_))) => {
489                    if _self.closing.load(Ordering::Acquire) {
490                        return;
491                    }
492                    let worker_count = _self.get_worker_count();
493                    if worker_count > _self.max_workers {
494                        continue;
495                    }
496                    self.clone().spawn(false, rx.clone());
497                }
498                _ => {
499                    println!("monitor exit");
500                    return;
501                }
502            }
503        }
504    }
505}
506
507enum TxWraper<M: Send + Sized + Unpin + 'static> {
508    Blocking(mpmc::TxFuture<M, mpmc::SharedSenderFRecvB>),
509    Async(mpmc::TxFuture<M, mpmc::SharedFutureBoth>),
510}
511
512enum RxWraper<M: Send + Sized + Unpin + 'static> {
513    Blocking(mpmc::RxBlocking<M, mpmc::SharedSenderFRecvB>),
514    Async(mpmc::RxFuture<M, mpmc::SharedFutureBoth>),
515}
516
517pub enum SendFutureWarper<'a, M: Send + Sized + Unpin + 'static> {
518    Blocking(mpmc::SendFuture<'a, M, mpmc::SharedSenderFRecvB>),
519    Async(mpmc::SendFuture<'a, M, mpmc::SharedFutureBoth>),
520}
521
522impl<'a, M: Send + Sized + Unpin + 'static> Future for SendFutureWarper<'a, M> {
523    type Output = Result<(), mpmc::SendError<M>>;
524
525    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
526        let _self = self.get_mut();
527        match _self {
528            Self::Blocking(s) => Pin::new(s).poll(ctx),
529            Self::Async(s) => Pin::new(s).poll(ctx),
530        }
531    }
532}
533
534impl<M: Send + Sized + Unpin + 'static> TxWraper<M> {
535    #[inline(always)]
536    fn try_send(&self, m: M) -> Result<(), mpmc::TrySendError<M>> {
537        match self {
538            Self::Blocking(s) => s.try_send(m),
539            Self::Async(s) => s.try_send(m),
540        }
541    }
542
543    #[inline(always)]
544    fn send<'a>(&'a self, m: M) -> SendFutureWarper<'a, M> {
545        match self {
546            Self::Blocking(s) => SendFutureWarper::Blocking(s.make_send_future(m)),
547            Self::Async(s) => SendFutureWarper::Async(s.make_send_future(m)),
548        }
549    }
550}
551
552impl<M: Send + Sized + Unpin + 'static> RxWraper<M> {
553    #[inline(always)]
554    async fn recv(&self) -> Result<M, mpmc::RecvError> {
555        match self {
556            Self::Blocking(s) => s.recv(),
557            Self::Async(s) => s.recv().await,
558        }
559    }
560
561    #[inline(always)]
562    fn try_recv(&self) -> Result<M, mpmc::TryRecvError> {
563        match self {
564            Self::Blocking(s) => s.try_recv(),
565            Self::Async(s) => s.try_recv(),
566        }
567    }
568}
569
570impl<M: Send + Sized + Unpin + 'static> Clone for RxWraper<M> {
571    fn clone(&self) -> Self {
572        match self {
573            Self::Blocking(s) => Self::Blocking(s.clone()),
574            Self::Async(s) => Self::Async(s.clone()),
575        }
576    }
577}
578
579#[cfg(test)]
580mod tests {
581
582    use crossfire::mpsc;
583    use tokio::time::{Duration, sleep};
584
585    use super::*;
586
587    #[allow(dead_code)]
588    struct MyWorkerPoolImpl();
589
590    struct MyWorker();
591
592    struct MyMsg(i64, mpsc::TxFuture<(), mpsc::SharedFutureBoth>);
593
594    impl WorkerPoolImpl<MyMsg, MyWorker> for MyWorkerPoolImpl {
595        fn spawn(&self) -> MyWorker {
596            MyWorker()
597        }
598    }
599
600    #[async_trait]
601    impl Worker<MyMsg> for MyWorker {
602        async fn run(&mut self, msg: MyMsg) {
603            sleep(Duration::from_millis(1)).await;
604            println!("done {}", msg.0);
605            let _ = msg.1.send(()).await;
606        }
607    }
608
609    type MyWorkerPool = WorkerPoolBounded<MyMsg, MyWorker, MyWorkerPoolImpl>;
610
611    #[test]
612    fn bounded_workerpool_adjust_close_async() {
613        let min_workers = 1;
614        let max_workers = 4;
615        let worker_timeout = Duration::from_secs(1);
616        let rt = tokio::runtime::Builder::new_multi_thread()
617            .enable_all()
618            .worker_threads(2)
619            .build()
620            .unwrap();
621        let worker_pool = MyWorkerPool::new(
622            MyWorkerPoolImpl(),
623            min_workers,
624            max_workers,
625            1,
626            worker_timeout,
627        );
628        let _worker_pool = worker_pool.clone();
629        rt.block_on(async move {
630            worker_pool.start();
631            let mut ths = Vec::new();
632            for i in 0..5 {
633                let _pool = worker_pool.clone();
634                ths.push(tokio::task::spawn(async move {
635                    let (done_tx, done_rx) = mpsc::bounded_future_both(10);
636                    for j in 0..2 {
637                        let m = i * 10 + j;
638                        println!("submit {} in {}/{}", m, j, i);
639                        _pool.submit(MyMsg(m, done_tx.clone())).await;
640                    }
641                    for _j in 0..2 {
642                        //println!("sender {} recv {}", i, _j);
643                        let _ = done_rx.recv().await;
644                    }
645                }));
646            }
647            for th in ths {
648                let _ = th.await;
649            }
650            let workers = worker_pool.get_worker_count();
651            println!("cur workers {} might reach max {}", workers, max_workers);
652            //assert_eq!(workers, max_workers);
653
654            sleep(worker_timeout * 2).await;
655            let workers = worker_pool.get_worker_count();
656            println!("cur workers: {}, extra should exit due to timeout", workers);
657            assert_eq!(workers, min_workers);
658
659            let (done_tx, done_rx) = mpsc::bounded_future_both(1);
660            for j in 0..10 {
661                worker_pool.submit(MyMsg(80 + j, done_tx.clone())).await;
662                let _ = done_rx.recv().await;
663            }
664            println!("closing");
665            _worker_pool.close();
666            sleep(Duration::from_secs(2)).await;
667            assert_eq!(_worker_pool.get_worker_count(), 0);
668        });
669    }
670
671    #[test]
672    fn bounded_workerpool_adjust_close() {
673        let min_workers = 1;
674        let max_workers = 4;
675        let worker_timeout = Duration::from_secs(1);
676        let rt = tokio::runtime::Builder::new_multi_thread()
677            .enable_all()
678            .worker_threads(2)
679            .build()
680            .unwrap();
681        let worker_pool = MyWorkerPool::new(
682            MyWorkerPoolImpl(),
683            min_workers,
684            max_workers,
685            1,
686            worker_timeout,
687        );
688        let _worker_pool = worker_pool.clone();
689        rt.block_on(async move {
690            worker_pool.start();
691            let mut ths = Vec::new();
692            for i in 0..5 {
693                let _pool = worker_pool.clone();
694                ths.push(tokio::task::spawn(async move {
695                    let (done_tx, done_rx) = mpsc::bounded_future_both(10);
696                    for j in 0..2 {
697                        let m = i * 10 + j;
698                        println!("submit {} in {}/{}", m, j, i);
699                        _pool.submit(MyMsg(m, done_tx.clone())).await;
700                    }
701                    for _j in 0..2 {
702                        //println!("sender {} recv {}", i, _j);
703                        let _ = done_rx.recv().await;
704                    }
705                }));
706            }
707            for th in ths {
708                let _ = th.await;
709            }
710            let workers = worker_pool.get_worker_count();
711            println!("cur workers {} might reach max {}", workers, max_workers);
712            //assert_eq!(workers, max_workers);
713
714            sleep(worker_timeout * 2).await;
715            let workers = worker_pool.get_worker_count();
716            println!("cur workers: {}, extra should exit due to timeout", workers);
717            assert_eq!(workers, min_workers);
718
719            let (done_tx, done_rx) = mpsc::bounded_future_both(1);
720            for j in 0..10 {
721                worker_pool.submit(MyMsg(80 + j, done_tx.clone())).await;
722                let _ = done_rx.recv().await;
723            }
724        });
725        println!("closing");
726        _worker_pool.close();
727        assert_eq!(_worker_pool.get_worker_count(), 0);
728    }
729
730    #[allow(dead_code)]
731    struct MyBlockingWorkerPoolImpl();
732
733    struct MyBlockingWorker();
734
735    impl WorkerPoolImpl<MyMsg, MyBlockingWorker> for MyBlockingWorkerPoolImpl {
736        fn spawn(&self) -> MyBlockingWorker {
737            MyBlockingWorker()
738        }
739    }
740
741    #[async_trait]
742    impl Worker<MyMsg> for MyBlockingWorker {
743        async fn run(&mut self, msg: MyMsg) {
744            std::thread::sleep(Duration::from_millis(1));
745            println!("done {}", msg.0);
746            let _ = msg.1.send(()).await;
747        }
748    }
749
750    type MyBlockingWorkerPool =
751        WorkerPoolBounded<MyMsg, MyBlockingWorker, MyBlockingWorkerPoolImpl>;
752
753    #[test]
754    fn bounded_workerpool_adjust_real_thread() {
755        let min_workers = 1;
756        let max_workers = 4;
757        let worker_timeout = Duration::from_secs(1);
758        let rt = tokio::runtime::Builder::new_multi_thread()
759            .enable_all()
760            .worker_threads(2)
761            .build()
762            .unwrap();
763        let mut worker_pool = MyBlockingWorkerPool::new(
764            MyBlockingWorkerPoolImpl(),
765            min_workers,
766            max_workers,
767            1,
768            worker_timeout,
769        );
770        worker_pool.set_use_thread(true);
771        let _worker_pool = worker_pool.clone();
772        rt.block_on(async move {
773            worker_pool.start();
774            let mut ths = Vec::new();
775            for i in 0..5 {
776                let _pool = worker_pool.clone();
777                ths.push(tokio::task::spawn(async move {
778                    let (done_tx, done_rx) = mpsc::bounded_future_both(10);
779                    for j in 0..2 {
780                        let m = i * 10 + j;
781                        println!("submit {} in {}/{}", m, j, i);
782                        _pool.submit(MyMsg(m, done_tx.clone())).await;
783                    }
784                    for _j in 0..2 {
785                        //println!("sender {} recv {}", i, _j);
786                        let _ = done_rx.recv().await;
787                    }
788                }));
789            }
790            for th in ths {
791                let _ = th.await;
792            }
793            let workers = worker_pool.get_worker_count();
794            println!("cur workers {} might reach max {}", workers, max_workers);
795            //assert_eq!(workers, max_workers);
796
797            sleep(worker_timeout * 2).await;
798            let workers = worker_pool.get_worker_count();
799            println!("cur workers: {}, extra should exit due to timeout", workers);
800            assert_eq!(workers, min_workers);
801
802            let (done_tx, done_rx) = mpsc::bounded_future_both(1);
803            for j in 0..10 {
804                worker_pool.submit(MyMsg(80 + j, done_tx.clone())).await;
805                let _ = done_rx.recv().await;
806            }
807        });
808        println!("closing");
809        _worker_pool.close();
810        assert_eq!(_worker_pool.get_worker_count(), 0);
811    }
812}