crossfire/mpmc/
bounded.rs

1use super::rx::*;
2use super::tx::*;
3use crate::channel::*;
4use crossbeam::queue::SegQueue;
5use std::sync::{
6    atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
7    Arc,
8};
9use std::task::*;
10
11/// Initiate a bounded channel that sender and receiver are async
12pub fn bounded_future_both<T: Unpin>(
13    size: usize,
14) -> (TxFuture<T, SharedFutureBoth>, RxFuture<T, SharedFutureBoth>) {
15    let (tx, rx) = crossbeam::channel::bounded(size);
16    let shared = Arc::new(SharedFutureBoth::new());
17
18    let tx_f = TxFuture::new(tx, shared.clone());
19    let rx_f = RxFuture::new(rx, shared);
20    (tx_f, rx_f)
21}
22
23/// Initiate a bounded channel that sender is async, receiver is blocking
24pub fn bounded_tx_future_rx_blocking<T: Unpin>(
25    size: usize,
26) -> (TxFuture<T, SharedSenderFRecvB>, RxBlocking<T, SharedSenderFRecvB>) {
27    let (tx, rx) = crossbeam::channel::bounded(size);
28    let shared = Arc::new(SharedSenderFRecvB::new());
29
30    let tx_f = TxFuture::new(tx, shared.clone());
31    let rx_b = RxBlocking::new(rx, shared);
32    (tx_f, rx_b)
33}
34
35/// Initiate a bounded channel that sender is blocking, receiver is sync
36pub fn bounded_tx_blocking_rx_future<T>(
37    size: usize,
38) -> (TxBlocking<T, SharedSenderBRecvF>, RxFuture<T, SharedSenderBRecvF>) {
39    let (tx, rx) = crossbeam::channel::bounded(size);
40    let shared = Arc::new(SharedSenderBRecvF::new());
41
42    let tx_b = TxBlocking::new(tx, shared.clone());
43    let rx_f = RxFuture::new(rx, shared);
44    (tx_b, rx_f)
45}
46
47pub struct SharedFutureBoth {
48    tx_count: AtomicUsize,
49    rx_count: AtomicUsize,
50    sender_waker: SegQueue<LockedWakerRef>,
51    recv_waker: SegQueue<LockedWakerRef>,
52    send_waker_tx_seq: AtomicU64,
53    send_waker_rx_seq: AtomicU64,
54    recv_waker_tx_seq: AtomicU64,
55    recv_waker_rx_seq: AtomicU64,
56    checking_sender: AtomicBool,
57    checking_recv: AtomicBool,
58}
59
60impl MPMCShared for SharedFutureBoth {
61    fn new() -> Self {
62        Self {
63            sender_waker: SegQueue::new(),
64            recv_waker: SegQueue::new(),
65            tx_count: AtomicUsize::new(1),
66            rx_count: AtomicUsize::new(1),
67            checking_sender: AtomicBool::new(false),
68            checking_recv: AtomicBool::new(false),
69            send_waker_tx_seq: AtomicU64::new(0),
70            send_waker_rx_seq: AtomicU64::new(0),
71            recv_waker_tx_seq: AtomicU64::new(0),
72            recv_waker_rx_seq: AtomicU64::new(0),
73        }
74    }
75
76    #[inline]
77    fn on_recv(&self) {
78        on_recv_m!(self)
79    }
80
81    #[inline]
82    fn on_send(&self) {
83        on_send_m!(self)
84    }
85
86    #[inline]
87    fn reg_recv(&self, ctx: &mut Context) -> Option<LockedWaker> {
88        reg_recv_m!(self, ctx)
89    }
90
91    #[inline]
92    fn reg_send(&self, ctx: &mut Context) -> Option<LockedWaker> {
93        reg_send_m!(self, ctx)
94    }
95
96    #[inline(always)]
97    fn add_tx(&self) {
98        let _ = self.tx_count.fetch_add(1, Ordering::SeqCst);
99    }
100
101    #[inline(always)]
102    fn add_rx(&self) {
103        let _ = self.rx_count.fetch_add(1, Ordering::SeqCst);
104    }
105
106    #[inline]
107    fn close_tx(&self) {
108        close_tx_common!(self)
109    }
110
111    #[inline]
112    fn close_rx(&self) {
113        close_rx_common!(self)
114    }
115
116    #[inline]
117    fn get_tx_count(&self) -> usize {
118        self.tx_count.load(Ordering::SeqCst)
119    }
120
121    fn get_waker_length(&self) -> (usize, usize) {
122        (self.sender_waker.len(), self.recv_waker.len())
123    }
124
125    #[inline]
126    fn clear_send_wakers(&self, waker: LockedWaker) {
127        clear_sender_wakers_common!(self, waker.get_seq())
128    }
129
130    #[inline]
131    fn clear_recv_wakers(&self, waker: LockedWaker) {
132        clear_recv_wakers_common!(self, waker.get_seq())
133    }
134}
135
136pub struct SharedSenderBRecvF {
137    tx_count: AtomicUsize,
138    rx_count: AtomicUsize,
139    recv_waker: SegQueue<LockedWakerRef>,
140    recv_waker_tx_seq: AtomicU64,
141    recv_waker_rx_seq: AtomicU64,
142    checking_recv: AtomicBool,
143}
144
145impl MPMCShared for SharedSenderBRecvF {
146    fn new() -> Self {
147        Self {
148            recv_waker: SegQueue::new(),
149            tx_count: AtomicUsize::new(1),
150            rx_count: AtomicUsize::new(1),
151            recv_waker_tx_seq: AtomicU64::new(0),
152            recv_waker_rx_seq: AtomicU64::new(0),
153            checking_recv: AtomicBool::new(false),
154        }
155    }
156
157    #[inline]
158    fn on_recv(&self) {}
159
160    #[inline]
161    fn on_send(&self) {
162        on_send_m!(self)
163    }
164
165    #[inline]
166    fn reg_recv(&self, ctx: &mut Context) -> Option<LockedWaker> {
167        reg_recv_m!(self, ctx)
168    }
169
170    #[inline]
171    fn reg_send(&self, _ctx: &mut Context) -> Option<LockedWaker> {
172        None
173    }
174
175    #[inline]
176    fn add_tx(&self) {
177        self.tx_count.fetch_add(1, Ordering::SeqCst);
178    }
179
180    #[inline]
181    fn add_rx(&self) {
182        self.rx_count.fetch_add(1, Ordering::SeqCst);
183    }
184
185    #[inline]
186    fn close_tx(&self) {
187        close_tx_common!(self)
188    }
189
190    #[inline]
191    fn close_rx(&self) {
192        self.rx_count.fetch_sub(1, Ordering::SeqCst);
193    }
194
195    #[inline]
196    fn get_tx_count(&self) -> usize {
197        self.tx_count.load(Ordering::SeqCst)
198    }
199
200    fn get_waker_length(&self) -> (usize, usize) {
201        (0, self.recv_waker.len())
202    }
203
204    #[inline]
205    fn clear_recv_wakers(&self, waker: LockedWaker) {
206        clear_recv_wakers_common!(self, waker.get_seq())
207    }
208}
209
210pub struct SharedSenderFRecvB {
211    tx_count: AtomicUsize,
212    rx_count: AtomicUsize,
213    sender_waker: SegQueue<LockedWakerRef>,
214    send_waker_tx_seq: AtomicU64,
215    send_waker_rx_seq: AtomicU64,
216    checking_sender: AtomicBool,
217}
218
219impl MPMCShared for SharedSenderFRecvB {
220    fn new() -> Self {
221        Self {
222            sender_waker: SegQueue::new(),
223            rx_count: AtomicUsize::new(1),
224            tx_count: AtomicUsize::new(1),
225            send_waker_tx_seq: AtomicU64::new(0),
226            send_waker_rx_seq: AtomicU64::new(0),
227            checking_sender: AtomicBool::new(false),
228        }
229    }
230
231    #[inline]
232    fn on_recv(&self) {
233        on_recv_m!(self)
234    }
235
236    #[inline]
237    fn on_send(&self) {}
238
239    #[inline]
240    fn reg_recv(&self, _ctx: &mut Context) -> Option<LockedWaker> {
241        None
242    }
243
244    #[inline]
245    fn reg_send(&self, ctx: &mut Context) -> Option<LockedWaker> {
246        reg_send_m!(self, ctx)
247    }
248
249    #[inline]
250    fn add_tx(&self) {
251        self.tx_count.fetch_add(1, Ordering::SeqCst);
252    }
253
254    #[inline]
255    fn add_rx(&self) {
256        self.rx_count.fetch_add(1, Ordering::SeqCst);
257    }
258
259    #[inline]
260    fn close_tx(&self) {
261        self.tx_count.fetch_sub(1, Ordering::SeqCst);
262    }
263
264    #[inline]
265    fn close_rx(&self) {
266        close_rx_common!(self)
267    }
268
269    #[inline]
270    fn get_tx_count(&self) -> usize {
271        return self.tx_count.load(Ordering::Acquire);
272    }
273
274    #[inline]
275    fn clear_send_wakers(&self, waker: LockedWaker) {
276        clear_sender_wakers_common!(self, waker.get_seq())
277    }
278
279    fn get_waker_length(&self) -> (usize, usize) {
280        (self.sender_waker.len(), 0)
281    }
282}
283
284#[cfg(test)]
285mod tests {
286
287    extern crate tokio;
288    use super::*;
289    use std::sync::atomic::{AtomicI32, Ordering};
290    use std::thread;
291    use std::time::{Duration, Instant};
292    use tokio::time::timeout;
293
294    #[test]
295    fn bench_std_sync_channel_performance() {
296        println!();
297        let total_message = 1000000;
298        let (tx, rx) = std::sync::mpsc::sync_channel(100);
299        let start = Instant::now();
300        thread::spawn(move || {
301            let _tx = tx.clone();
302            for i in 0..total_message {
303                let _ = _tx.send(i);
304            }
305        });
306
307        for _ in 0..total_message {
308            rx.recv().unwrap();
309        }
310        let end = Instant::now();
311
312        println!("{} message, single sender thread single receiver thread use std::sync::sync_channel, cost time:{} s",
313                 total_message, (total_message as f64) / end.duration_since(start).as_secs_f64());
314    }
315
316    #[test]
317    fn bench_crossbeam_channel_performance() {
318        println!();
319        let total_message = 1000000;
320        let (tx, rx) = crossbeam::channel::bounded(100);
321        let start = Instant::now();
322        thread::spawn(move || {
323            let _tx = tx.clone();
324            for i in 0..total_message {
325                let _ = _tx.send(i);
326            }
327        });
328
329        for _ in 0..total_message {
330            rx.recv().unwrap();
331        }
332        let end = Instant::now();
333
334        println!(
335            "{} message, single sender thread single receiver thread use crossbeam::channel, {} /s",
336            total_message,
337            (total_message as f64) / end.duration_since(start).as_secs_f64()
338        );
339    }
340
341    #[test]
342    fn bench_future_both_performance() {
343        println!();
344        let rt = tokio::runtime::Builder::new_multi_thread()
345            .worker_threads(2)
346            .enable_all()
347            .build()
348            .unwrap();
349        rt.block_on(async move {
350            let total_message = 1000000;
351            let (tx, rx) = bounded_future_both::<i32>(100);
352            let start = Instant::now();
353            tokio::spawn(async move {
354                println!("sender thread send {} message start", total_message);
355                for i in 0i32..total_message {
356                    let _ = tx.send(i).await;
357                    //println!("sent {}", i);
358                }
359                println!("sender thread send {} message end", total_message);
360            });
361
362            for _ in 0..total_message {
363                if let Ok(_i) = rx.recv().await {
364                    //println!("recv {}", _i);
365                }
366            }
367            let end = Instant::now();
368
369            println!(
370                "{} message, single sender thread single receiver thread use mpmc {} /s",
371                total_message,
372                (total_message as f64) / end.duration_since(start).as_secs_f64()
373            );
374        });
375    }
376
377    #[test]
378    fn bench_future_both_latency() {
379        println!();
380        let rt = tokio::runtime::Builder::new_multi_thread()
381            .worker_threads(2)
382            .enable_all()
383            .build()
384            .unwrap();
385        rt.block_on(async move {
386            let total_message = 10000;
387            let (tx, rx) = bounded_future_both::<i32>(100);
388            let (tx_done, rx_done) = bounded_future_both::<()>(1);
389            let start = Instant::now();
390
391            let rx1 = rx.clone();
392            let tx_done1 = tx_done.clone();
393            tokio::spawn(async move {
394                while let Ok(_i) = rx1.recv().await {
395                    tx_done1.send(()).await;
396                }
397            });
398            let rx2 = rx.clone();
399            let tx_done2 = tx_done.clone();
400            tokio::spawn(async move {
401                while let Ok(_i) = rx2.recv().await {
402                    tx_done2.send(()).await;
403                }
404            });
405            println!("sender thread send {} message start", total_message);
406            for i in 0i32..total_message {
407                let _ = tx.send(i).await;
408                let _ = rx_done.recv().await;
409                //println!("sent {}", i);
410            }
411            println!("sender thread send {} message end", total_message);
412            let end = Instant::now();
413
414            println!(
415                "{} message, 2 channel roundtrip use mpmc {} /s",
416                total_message,
417                (total_message as f64) / end.duration_since(start).as_secs_f64()
418            );
419        });
420    }
421
422    #[test]
423    fn bench_tx_blocking_rx_future_performance() {
424        println!();
425        let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
426        let total_message = 1000000;
427        let (tx, rx_f) = bounded_tx_blocking_rx_future::<i32>(100);
428        let start = Instant::now();
429        thread::spawn(move || {
430            for i in 0..total_message {
431                let _ = tx.send(i);
432            }
433        });
434        rt.block_on(async move {
435            for _ in 0..total_message {
436                let _ = rx_f.recv().await;
437            }
438            let end = Instant::now();
439
440            println!(
441                "{} message, single sender thread single receiver thread use mpmc {} /s",
442                total_message,
443                (total_message as f64) / end.duration_since(start).as_secs_f64()
444            );
445        });
446    }
447
448    #[test]
449    fn bench_tx_future_rx_blocking_performance() {
450        println!();
451        let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
452        let total_message = 1000000;
453        let (tx_f, rx) = bounded_tx_future_rx_blocking::<i32>(100);
454        let start = Instant::now();
455        let th = thread::spawn(move || {
456            for _i in 0..total_message {
457                let _r = rx.recv();
458                //                assert_eq!(r.unwrap(), i);
459            }
460            let end = Instant::now();
461            println!(
462                "{} message, single sender thread single receiver thread use mpmc {} /s",
463                total_message,
464                (total_message as f64) / end.duration_since(start).as_secs_f64()
465            );
466        });
467        rt.block_on(async move {
468            for i in 0i32..total_message {
469                let _ = tx_f.send(i).await;
470            }
471        });
472        let _ = th.join();
473    }
474
475    #[test]
476    fn bench_tokio_mpsc_performance() {
477        println!();
478        let rt = tokio::runtime::Builder::new_multi_thread()
479            .worker_threads(2)
480            .enable_all()
481            .build()
482            .unwrap();
483        rt.block_on(async move {
484            let total_message = 1000000;
485            let (tx, mut rx) = tokio::sync::mpsc::channel::<i32>(100);
486            let start = Instant::now();
487            tokio::spawn(async move {
488                println!("sender thread send {} message start", total_message);
489                let mut _tx = tx.clone();
490                for i in 0i32..total_message {
491                    let _ = _tx.send(i).await;
492                }
493                println!("sender thread send {} message end", total_message);
494            });
495
496            println!("receiver thread recv {} message start", total_message);
497            for _ in 0..total_message {
498                rx.recv().await;
499            }
500            println!("receiver thread recv {} message end", total_message);
501            let end = Instant::now();
502
503            println!("{} message, single sender thread single receiver thread use tokio::sync::channel, {} /s",
504                     total_message, (total_message as f64) / end.duration_since(start).as_secs_f64());
505        });
506    }
507
508    #[test]
509    fn test_mpmc_sender_close() {
510        let (tx, rx) = bounded_tx_blocking_rx_future::<i32>(10);
511        let total_msg_count = 5;
512        for i in 0..total_msg_count {
513            let _ = tx.send(i);
514        }
515        drop(tx);
516        let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
517        rt.block_on(async move {
518            let mut recv_msg_count = 0;
519            loop {
520                match rx.recv().await {
521                    Ok(_) => {
522                        recv_msg_count += 1;
523                    }
524                    Err(_) => {
525                        break;
526                    }
527                }
528            }
529            assert_eq!(recv_msg_count, total_msg_count);
530        });
531    }
532
533    #[test]
534    fn test_future_both_1_thread_single() {
535        let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
536        rt.block_on(async move {
537            let (tx, rx) = bounded_future_both::<i32>(10);
538            let rx_res = rx.try_recv();
539            assert!(rx_res.is_err());
540            assert!(rx_res.unwrap_err().is_empty());
541            for i in 0i32..10 {
542                let tx_res = tx.try_send(i);
543                assert!(tx_res.is_ok());
544            }
545            let tx_res = tx.try_send(11);
546            assert!(tx_res.is_err());
547            assert!(tx_res.unwrap_err().is_full());
548
549            let (noti_tx, noti_rx) = tokio::sync::oneshot::channel::<bool>();
550            tokio::spawn(async move {
551                for i in 0i32..12 {
552                    match rx.recv().await {
553                        Ok(j) => {
554                            println!("recv {}", i);
555                            assert_eq!(i, j);
556                        }
557                        Err(e) => {
558                            panic!("error {}", e);
559                        }
560                    }
561                }
562                let res = rx.recv().await;
563                assert!(res.is_err());
564                println!("rx close");
565                let _ = noti_tx.send(true);
566            });
567            assert!(tx.send(10).await.is_ok());
568            tokio::time::sleep(Duration::from_secs(1)).await;
569            assert!(tx.send(11).await.is_ok());
570            drop(tx);
571            let _ = noti_rx.await;
572        });
573    }
574
575    #[test]
576    fn test_tx_blocking_rx_future_1_thread_single() {
577        let rt = tokio::runtime::Builder::new_multi_thread()
578            .enable_all()
579            .worker_threads(1)
580            .build()
581            .unwrap();
582        rt.block_on(async move {
583            let (tx, rx) = bounded_tx_blocking_rx_future::<i32>(10);
584            let rx_res = rx.try_recv();
585            assert!(rx_res.is_err());
586            assert!(rx_res.unwrap_err().is_empty());
587            for i in 0i32..10 {
588                let tx_res = tx.send(i);
589                assert!(tx_res.is_ok());
590            }
591            let tx_res = tx.try_send(11);
592            assert!(tx_res.is_err());
593            assert!(tx_res.unwrap_err().is_full());
594
595            let (noti_tx, noti_rx) = tokio::sync::oneshot::channel::<bool>();
596            tokio::spawn(async move {
597                for i in 0i32..12 {
598                    match rx.recv().await {
599                        Ok(j) => {
600                            println!("recv {}", i);
601                            assert_eq!(i, j);
602                        }
603                        Err(e) => {
604                            panic!("error {}", e);
605                        }
606                    }
607                }
608                let res = rx.recv().await;
609                assert!(res.is_err());
610                println!("rx close");
611                let _ = noti_tx.send(true);
612            });
613            assert!(tx.send(10).is_ok());
614            tokio::time::sleep(Duration::from_secs(1)).await;
615            assert!(tx.send(11).is_ok());
616            drop(tx);
617            let _ = noti_rx.await;
618        });
619    }
620
621    #[test]
622    fn test_tx_future_rx_blocking_1_thread_single() {
623        let rt = tokio::runtime::Builder::new_multi_thread()
624            .enable_all()
625            .worker_threads(1)
626            .build()
627            .unwrap();
628        rt.block_on(async move {
629            let (tx, rx) = bounded_tx_future_rx_blocking::<i32>(10);
630            let rx_res = rx.try_recv();
631            assert!(rx_res.is_err());
632            assert!(rx_res.unwrap_err().is_empty());
633            for i in 0i32..10 {
634                let tx_res = tx.send(i).await;
635                assert!(tx_res.is_ok());
636            }
637            let tx_res = tx.try_send(11);
638            assert!(tx_res.is_err());
639            assert!(tx_res.unwrap_err().is_full());
640
641            let (noti_tx, noti_rx) = tokio::sync::oneshot::channel::<bool>();
642            tokio::spawn(async move {
643                for i in 0i32..5 {
644                    assert!(tx.send(10 + i).await.is_ok());
645                    tokio::time::sleep(Duration::from_secs(1)).await;
646                }
647                println!("tx close");
648                let _ = noti_tx.send(true);
649            });
650            for i in 0i32..15 {
651                match rx.recv() {
652                    Ok(j) => {
653                        println!("recv {}", i);
654                        assert_eq!(i, j);
655                    }
656                    Err(e) => {
657                        panic!("error {}", e);
658                    }
659                }
660            }
661            let res = rx.recv();
662            assert!(res.is_err());
663            drop(rx);
664            let _ = noti_rx.await;
665        });
666    }
667
668    #[test]
669    fn test_future_both_1_thread_multi_4tx_2rx() {
670        _future_both_thread_multi(1, 4, 2);
671    }
672
673    #[test]
674    fn test_future_both_2_thread_multi_4tx_2rx() {
675        _future_both_thread_multi(2, 4, 2);
676    }
677
678    #[test]
679    fn test_future_both_8_thread_multi_4tx_4rx() {
680        _future_both_thread_multi(8, 4, 4);
681    }
682
683    fn _future_both_thread_multi(real_threads: usize, tx_count: usize, rx_count: usize) {
684        let rt = tokio::runtime::Builder::new_multi_thread()
685            .worker_threads(real_threads)
686            .enable_all()
687            .build()
688            .unwrap();
689        rt.block_on(async move {
690            let (tx, rx) = bounded_future_both::<i32>(10);
691            let (noti_tx, mut noti_rx) = tokio::sync::mpsc::channel::<usize>(tx_count + rx_count);
692
693            let counter = Arc::new(AtomicI32::new(0));
694            let round = 100000;
695            println!("");
696            for _tx_i in 0..tx_count {
697                let _tx = tx.clone();
698                let mut _noti_tx = noti_tx.clone();
699                let _round = round;
700                tokio::spawn(async move {
701                    for i in 0i32.._round {
702                        match _tx.send(i).await {
703                            Err(e) => panic!("{}", e),
704                            _ => {}
705                        }
706                    }
707                    let _ = _noti_tx.send(_tx_i).await;
708                    println!("tx {} exit", _tx_i);
709                });
710            }
711            for _rx_i in 0..rx_count {
712                let _rx = rx.clone();
713                let mut _noti_tx = noti_tx.clone();
714                let _counter = counter.clone();
715                tokio::spawn(async move {
716                    'A: loop {
717                        match _rx.recv().await {
718                            Ok(_i) => {
719                                _counter.as_ref().fetch_add(1i32, Ordering::SeqCst);
720                                //print!("recv {} {}\r", _rx_i, i);
721                            }
722                            Err(_) => break 'A,
723                        }
724                    }
725                    let _ = _noti_tx.send(_rx_i).await;
726                    println!("rx {} exit", _rx_i);
727                });
728            }
729            drop(tx);
730            drop(rx);
731            drop(noti_tx);
732            for _ in 0..(rx_count + tx_count) {
733                match noti_rx.recv().await {
734                    Some(_) => {}
735                    None => break,
736                }
737            }
738            assert_eq!(counter.as_ref().load(Ordering::Acquire), round * (tx_count as i32));
739        });
740    }
741
742    fn _tx_blocking_rx_future_multi(real_threads: usize, tx_count: usize, rx_count: usize) {
743        let rt = tokio::runtime::Builder::new_multi_thread()
744            .worker_threads(real_threads)
745            .enable_all()
746            .build()
747            .unwrap();
748        let (tx, rx) = bounded_tx_blocking_rx_future::<i32>(10);
749        let counter = Arc::new(AtomicI32::new(0));
750        let round = 100000;
751        let mut tx_ths = Vec::new();
752        let send_msg = Arc::new(AtomicI32::new(0));
753        for _tx_i in 0..tx_count {
754            let _tx = tx.clone();
755            let _round = round;
756            let _send_msg = send_msg.clone();
757            tx_ths.push(thread::spawn(move || {
758                loop {
759                    let i = _send_msg.fetch_add(1, Ordering::SeqCst);
760                    if i >= round {
761                        break;
762                    }
763                    match _tx.send(i) {
764                        Err(e) => panic!("{}", e),
765                        _ => {
766                            //println!("tx {} {}", _tx_i, i);
767                        }
768                    }
769                }
770                println!("tx {} exit", _tx_i);
771            }));
772        }
773        drop(tx);
774        rt.block_on(async move {
775            let (noti_tx, mut noti_rx) = tokio::sync::mpsc::channel::<usize>(rx_count);
776            for _rx_i in 0..rx_count {
777                let _rx = rx.clone();
778                let mut _noti_tx = noti_tx.clone();
779                let _counter = counter.clone();
780                tokio::spawn(async move {
781                    'A: loop {
782                        match _rx.recv().await {
783                            Ok(_i) => {
784                                _counter.as_ref().fetch_add(1i32, Ordering::SeqCst);
785                                //println!("rx {} {}\r", _rx_i, _i);
786                            }
787                            Err(_) => break 'A,
788                        }
789                    }
790                    println!("rx {} exiting", _rx_i);
791                    let _ = _noti_tx.send(_rx_i).await;
792                    println!("rx {} exit", _rx_i);
793                });
794            }
795            drop(rx);
796            drop(noti_tx);
797            for _ in 0..(rx_count) {
798                match noti_rx.recv().await {
799                    Some(_) => {}
800                    None => break,
801                }
802            }
803            assert_eq!(counter.as_ref().load(Ordering::Acquire), round as i32);
804        });
805        for th in tx_ths {
806            let _ = th.join();
807        }
808    }
809
810    #[test]
811    fn test_tx_blocking_rx_future_1_thread_multi_4tx_2rx() {
812        _tx_blocking_rx_future_multi(1, 4, 2);
813    }
814
815    #[test]
816    fn test_tx_blocking_rx_future_2_thread_multi_4tx_3rx() {
817        _tx_blocking_rx_future_multi(2, 4, 3);
818    }
819
820    #[test]
821    fn test_tx_blocking_rx_future_8_thread_multi_4tx_4rx() {
822        _tx_blocking_rx_future_multi(8, 4, 4);
823    }
824
825    fn _tx_future_rx_blocking_multi(real_threads: usize, tx_count: usize, rx_count: usize) {
826        let rt = tokio::runtime::Builder::new_multi_thread()
827            .worker_threads(real_threads)
828            .enable_all()
829            .build()
830            .unwrap();
831        let (tx, rx) = bounded_tx_future_rx_blocking::<i32>(10);
832        let counter = Arc::new(AtomicI32::new(0));
833        let round = 100000;
834        let mut rx_ths = Vec::new();
835        for _rx_i in 0..rx_count {
836            let _rx = rx.clone();
837            let _round = round;
838            let _counter = counter.clone();
839            rx_ths.push(thread::spawn(move || {
840                'A: loop {
841                    match _rx.recv() {
842                        Ok(_) => {
843                            _counter.as_ref().fetch_add(1i32, Ordering::SeqCst);
844                            //print!("{} {}\r", _rx_i, i);
845                        }
846                        Err(_) => break 'A,
847                    }
848                }
849                println!("rx {} exit", _rx_i);
850            }));
851        }
852        drop(rx);
853        rt.block_on(async move {
854            let (noti_tx, mut noti_rx) = tokio::sync::mpsc::channel::<usize>(tx_count);
855            for _tx_i in 0..tx_count {
856                let _tx = tx.clone();
857                let mut _noti_tx = noti_tx.clone();
858                tokio::spawn(async move {
859                    for i in 0i32..round {
860                        match _tx.send(i).await {
861                            Err(e) => panic!("{}", e),
862                            _ => {}
863                        }
864                    }
865                    let _ = _noti_tx.send(_tx_i).await;
866                    println!("tx {} exit", _tx_i);
867                });
868            }
869            drop(tx);
870            drop(noti_tx);
871            for _ in 0..(tx_count) {
872                match noti_rx.recv().await {
873                    Some(_) => {}
874                    None => break,
875                }
876            }
877        });
878        for th in rx_ths {
879            let _ = th.join();
880        }
881        assert_eq!(counter.as_ref().load(Ordering::Acquire), round * (tx_count as i32));
882    }
883
884    #[test]
885    fn test_tx_future_rx_blocking_1_thread_multi_4tx_2rx() {
886        _tx_future_rx_blocking_multi(1, 4, 2);
887    }
888
889    #[test]
890    fn test_tx_future_rx_blocking_2_thread_multi_4tx_3rx() {
891        _tx_future_rx_blocking_multi(2, 4, 3);
892    }
893
894    #[test]
895    fn test_tx_future_rx_blocking_8_thread_multi_4tx_4rx() {
896        _tx_future_rx_blocking_multi(8, 4, 4);
897    }
898
899    #[test]
900    fn test_timeout_future_both() {
901        let rt = tokio::runtime::Builder::new_multi_thread()
902            .worker_threads(2)
903            .enable_all()
904            .build()
905            .unwrap();
906        rt.block_on(async move {
907            let (tx, rx) = bounded_future_both::<i32>(100);
908            assert!(timeout(Duration::from_secs(1), rx.recv()).await.is_err());
909            let (tx_done, rx_done) = crossbeam::channel::bounded::<i32>(1);
910            tokio::spawn(async move {
911                match rx.recv().await {
912                    Ok(item) => {
913                        let _ = tx_done.send(item);
914                    }
915                    Err(_e) => {
916                        println!("recv error");
917                    }
918                }
919            });
920            let _ = tx.send(1).await;
921            assert_eq!(rx_done.recv().unwrap(), 1);
922        });
923    }
924}