1use super::rx::*;
2use super::tx::*;
3use crate::channel::*;
4use crossbeam::queue::SegQueue;
5use std::sync::{
6 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
7 Arc,
8};
9use std::task::*;
10
11pub fn bounded_future_both<T: Unpin>(
13 size: usize,
14) -> (TxFuture<T, SharedFutureBoth>, RxFuture<T, SharedFutureBoth>) {
15 let (tx, rx) = crossbeam::channel::bounded(size);
16 let shared = Arc::new(SharedFutureBoth::new());
17
18 let tx_f = TxFuture::new(tx, shared.clone());
19 let rx_f = RxFuture::new(rx, shared);
20 (tx_f, rx_f)
21}
22
23pub fn bounded_tx_future_rx_blocking<T: Unpin>(
25 size: usize,
26) -> (TxFuture<T, SharedSenderFRecvB>, RxBlocking<T, SharedSenderFRecvB>) {
27 let (tx, rx) = crossbeam::channel::bounded(size);
28 let shared = Arc::new(SharedSenderFRecvB::new());
29
30 let tx_f = TxFuture::new(tx, shared.clone());
31 let rx_b = RxBlocking::new(rx, shared);
32 (tx_f, rx_b)
33}
34
35pub fn bounded_tx_blocking_rx_future<T>(
37 size: usize,
38) -> (TxBlocking<T, SharedSenderBRecvF>, RxFuture<T, SharedSenderBRecvF>) {
39 let (tx, rx) = crossbeam::channel::bounded(size);
40 let shared = Arc::new(SharedSenderBRecvF::new());
41
42 let tx_b = TxBlocking::new(tx, shared.clone());
43 let rx_f = RxFuture::new(rx, shared);
44 (tx_b, rx_f)
45}
46
47pub struct SharedFutureBoth {
48 tx_count: AtomicUsize,
49 rx_count: AtomicUsize,
50 sender_waker: SegQueue<LockedWakerRef>,
51 recv_waker: SegQueue<LockedWakerRef>,
52 send_waker_tx_seq: AtomicU64,
53 send_waker_rx_seq: AtomicU64,
54 recv_waker_tx_seq: AtomicU64,
55 recv_waker_rx_seq: AtomicU64,
56 checking_sender: AtomicBool,
57 checking_recv: AtomicBool,
58}
59
60impl MPMCShared for SharedFutureBoth {
61 fn new() -> Self {
62 Self {
63 sender_waker: SegQueue::new(),
64 recv_waker: SegQueue::new(),
65 tx_count: AtomicUsize::new(1),
66 rx_count: AtomicUsize::new(1),
67 checking_sender: AtomicBool::new(false),
68 checking_recv: AtomicBool::new(false),
69 send_waker_tx_seq: AtomicU64::new(0),
70 send_waker_rx_seq: AtomicU64::new(0),
71 recv_waker_tx_seq: AtomicU64::new(0),
72 recv_waker_rx_seq: AtomicU64::new(0),
73 }
74 }
75
76 #[inline]
77 fn on_recv(&self) {
78 on_recv_m!(self)
79 }
80
81 #[inline]
82 fn on_send(&self) {
83 on_send_m!(self)
84 }
85
86 #[inline]
87 fn reg_recv(&self, ctx: &mut Context) -> Option<LockedWaker> {
88 reg_recv_m!(self, ctx)
89 }
90
91 #[inline]
92 fn reg_send(&self, ctx: &mut Context) -> Option<LockedWaker> {
93 reg_send_m!(self, ctx)
94 }
95
96 #[inline(always)]
97 fn add_tx(&self) {
98 let _ = self.tx_count.fetch_add(1, Ordering::SeqCst);
99 }
100
101 #[inline(always)]
102 fn add_rx(&self) {
103 let _ = self.rx_count.fetch_add(1, Ordering::SeqCst);
104 }
105
106 #[inline]
107 fn close_tx(&self) {
108 close_tx_common!(self)
109 }
110
111 #[inline]
112 fn close_rx(&self) {
113 close_rx_common!(self)
114 }
115
116 #[inline]
117 fn get_tx_count(&self) -> usize {
118 self.tx_count.load(Ordering::SeqCst)
119 }
120
121 fn get_waker_length(&self) -> (usize, usize) {
122 (self.sender_waker.len(), self.recv_waker.len())
123 }
124
125 #[inline]
126 fn clear_send_wakers(&self, waker: LockedWaker) {
127 clear_sender_wakers_common!(self, waker.get_seq())
128 }
129
130 #[inline]
131 fn clear_recv_wakers(&self, waker: LockedWaker) {
132 clear_recv_wakers_common!(self, waker.get_seq())
133 }
134}
135
136pub struct SharedSenderBRecvF {
137 tx_count: AtomicUsize,
138 rx_count: AtomicUsize,
139 recv_waker: SegQueue<LockedWakerRef>,
140 recv_waker_tx_seq: AtomicU64,
141 recv_waker_rx_seq: AtomicU64,
142 checking_recv: AtomicBool,
143}
144
145impl MPMCShared for SharedSenderBRecvF {
146 fn new() -> Self {
147 Self {
148 recv_waker: SegQueue::new(),
149 tx_count: AtomicUsize::new(1),
150 rx_count: AtomicUsize::new(1),
151 recv_waker_tx_seq: AtomicU64::new(0),
152 recv_waker_rx_seq: AtomicU64::new(0),
153 checking_recv: AtomicBool::new(false),
154 }
155 }
156
157 #[inline]
158 fn on_recv(&self) {}
159
160 #[inline]
161 fn on_send(&self) {
162 on_send_m!(self)
163 }
164
165 #[inline]
166 fn reg_recv(&self, ctx: &mut Context) -> Option<LockedWaker> {
167 reg_recv_m!(self, ctx)
168 }
169
170 #[inline]
171 fn reg_send(&self, _ctx: &mut Context) -> Option<LockedWaker> {
172 None
173 }
174
175 #[inline]
176 fn add_tx(&self) {
177 self.tx_count.fetch_add(1, Ordering::SeqCst);
178 }
179
180 #[inline]
181 fn add_rx(&self) {
182 self.rx_count.fetch_add(1, Ordering::SeqCst);
183 }
184
185 #[inline]
186 fn close_tx(&self) {
187 close_tx_common!(self)
188 }
189
190 #[inline]
191 fn close_rx(&self) {
192 self.rx_count.fetch_sub(1, Ordering::SeqCst);
193 }
194
195 #[inline]
196 fn get_tx_count(&self) -> usize {
197 self.tx_count.load(Ordering::SeqCst)
198 }
199
200 fn get_waker_length(&self) -> (usize, usize) {
201 (0, self.recv_waker.len())
202 }
203
204 #[inline]
205 fn clear_recv_wakers(&self, waker: LockedWaker) {
206 clear_recv_wakers_common!(self, waker.get_seq())
207 }
208}
209
210pub struct SharedSenderFRecvB {
211 tx_count: AtomicUsize,
212 rx_count: AtomicUsize,
213 sender_waker: SegQueue<LockedWakerRef>,
214 send_waker_tx_seq: AtomicU64,
215 send_waker_rx_seq: AtomicU64,
216 checking_sender: AtomicBool,
217}
218
219impl MPMCShared for SharedSenderFRecvB {
220 fn new() -> Self {
221 Self {
222 sender_waker: SegQueue::new(),
223 rx_count: AtomicUsize::new(1),
224 tx_count: AtomicUsize::new(1),
225 send_waker_tx_seq: AtomicU64::new(0),
226 send_waker_rx_seq: AtomicU64::new(0),
227 checking_sender: AtomicBool::new(false),
228 }
229 }
230
231 #[inline]
232 fn on_recv(&self) {
233 on_recv_m!(self)
234 }
235
236 #[inline]
237 fn on_send(&self) {}
238
239 #[inline]
240 fn reg_recv(&self, _ctx: &mut Context) -> Option<LockedWaker> {
241 None
242 }
243
244 #[inline]
245 fn reg_send(&self, ctx: &mut Context) -> Option<LockedWaker> {
246 reg_send_m!(self, ctx)
247 }
248
249 #[inline]
250 fn add_tx(&self) {
251 self.tx_count.fetch_add(1, Ordering::SeqCst);
252 }
253
254 #[inline]
255 fn add_rx(&self) {
256 self.rx_count.fetch_add(1, Ordering::SeqCst);
257 }
258
259 #[inline]
260 fn close_tx(&self) {
261 self.tx_count.fetch_sub(1, Ordering::SeqCst);
262 }
263
264 #[inline]
265 fn close_rx(&self) {
266 close_rx_common!(self)
267 }
268
269 #[inline]
270 fn get_tx_count(&self) -> usize {
271 return self.tx_count.load(Ordering::Acquire);
272 }
273
274 #[inline]
275 fn clear_send_wakers(&self, waker: LockedWaker) {
276 clear_sender_wakers_common!(self, waker.get_seq())
277 }
278
279 fn get_waker_length(&self) -> (usize, usize) {
280 (self.sender_waker.len(), 0)
281 }
282}
283
284#[cfg(test)]
285mod tests {
286
287 extern crate tokio;
288 use super::*;
289 use std::sync::atomic::{AtomicI32, Ordering};
290 use std::thread;
291 use std::time::{Duration, Instant};
292 use tokio::time::timeout;
293
294 #[test]
295 fn bench_std_sync_channel_performance() {
296 println!();
297 let total_message = 1000000;
298 let (tx, rx) = std::sync::mpsc::sync_channel(100);
299 let start = Instant::now();
300 thread::spawn(move || {
301 let _tx = tx.clone();
302 for i in 0..total_message {
303 let _ = _tx.send(i);
304 }
305 });
306
307 for _ in 0..total_message {
308 rx.recv().unwrap();
309 }
310 let end = Instant::now();
311
312 println!("{} message, single sender thread single receiver thread use std::sync::sync_channel, cost time:{} s",
313 total_message, (total_message as f64) / end.duration_since(start).as_secs_f64());
314 }
315
316 #[test]
317 fn bench_crossbeam_channel_performance() {
318 println!();
319 let total_message = 1000000;
320 let (tx, rx) = crossbeam::channel::bounded(100);
321 let start = Instant::now();
322 thread::spawn(move || {
323 let _tx = tx.clone();
324 for i in 0..total_message {
325 let _ = _tx.send(i);
326 }
327 });
328
329 for _ in 0..total_message {
330 rx.recv().unwrap();
331 }
332 let end = Instant::now();
333
334 println!(
335 "{} message, single sender thread single receiver thread use crossbeam::channel, {} /s",
336 total_message,
337 (total_message as f64) / end.duration_since(start).as_secs_f64()
338 );
339 }
340
341 #[test]
342 fn bench_future_both_performance() {
343 println!();
344 let rt = tokio::runtime::Builder::new_multi_thread()
345 .worker_threads(2)
346 .enable_all()
347 .build()
348 .unwrap();
349 rt.block_on(async move {
350 let total_message = 1000000;
351 let (tx, rx) = bounded_future_both::<i32>(100);
352 let start = Instant::now();
353 tokio::spawn(async move {
354 println!("sender thread send {} message start", total_message);
355 for i in 0i32..total_message {
356 let _ = tx.send(i).await;
357 }
359 println!("sender thread send {} message end", total_message);
360 });
361
362 for _ in 0..total_message {
363 if let Ok(_i) = rx.recv().await {
364 }
366 }
367 let end = Instant::now();
368
369 println!(
370 "{} message, single sender thread single receiver thread use mpmc {} /s",
371 total_message,
372 (total_message as f64) / end.duration_since(start).as_secs_f64()
373 );
374 });
375 }
376
377 #[test]
378 fn bench_future_both_latency() {
379 println!();
380 let rt = tokio::runtime::Builder::new_multi_thread()
381 .worker_threads(2)
382 .enable_all()
383 .build()
384 .unwrap();
385 rt.block_on(async move {
386 let total_message = 10000;
387 let (tx, rx) = bounded_future_both::<i32>(100);
388 let (tx_done, rx_done) = bounded_future_both::<()>(1);
389 let start = Instant::now();
390
391 let rx1 = rx.clone();
392 let tx_done1 = tx_done.clone();
393 tokio::spawn(async move {
394 while let Ok(_i) = rx1.recv().await {
395 tx_done1.send(()).await;
396 }
397 });
398 let rx2 = rx.clone();
399 let tx_done2 = tx_done.clone();
400 tokio::spawn(async move {
401 while let Ok(_i) = rx2.recv().await {
402 tx_done2.send(()).await;
403 }
404 });
405 println!("sender thread send {} message start", total_message);
406 for i in 0i32..total_message {
407 let _ = tx.send(i).await;
408 let _ = rx_done.recv().await;
409 }
411 println!("sender thread send {} message end", total_message);
412 let end = Instant::now();
413
414 println!(
415 "{} message, 2 channel roundtrip use mpmc {} /s",
416 total_message,
417 (total_message as f64) / end.duration_since(start).as_secs_f64()
418 );
419 });
420 }
421
422 #[test]
423 fn bench_tx_blocking_rx_future_performance() {
424 println!();
425 let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
426 let total_message = 1000000;
427 let (tx, rx_f) = bounded_tx_blocking_rx_future::<i32>(100);
428 let start = Instant::now();
429 thread::spawn(move || {
430 for i in 0..total_message {
431 let _ = tx.send(i);
432 }
433 });
434 rt.block_on(async move {
435 for _ in 0..total_message {
436 let _ = rx_f.recv().await;
437 }
438 let end = Instant::now();
439
440 println!(
441 "{} message, single sender thread single receiver thread use mpmc {} /s",
442 total_message,
443 (total_message as f64) / end.duration_since(start).as_secs_f64()
444 );
445 });
446 }
447
448 #[test]
449 fn bench_tx_future_rx_blocking_performance() {
450 println!();
451 let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
452 let total_message = 1000000;
453 let (tx_f, rx) = bounded_tx_future_rx_blocking::<i32>(100);
454 let start = Instant::now();
455 let th = thread::spawn(move || {
456 for _i in 0..total_message {
457 let _r = rx.recv();
458 }
460 let end = Instant::now();
461 println!(
462 "{} message, single sender thread single receiver thread use mpmc {} /s",
463 total_message,
464 (total_message as f64) / end.duration_since(start).as_secs_f64()
465 );
466 });
467 rt.block_on(async move {
468 for i in 0i32..total_message {
469 let _ = tx_f.send(i).await;
470 }
471 });
472 let _ = th.join();
473 }
474
475 #[test]
476 fn bench_tokio_mpsc_performance() {
477 println!();
478 let rt = tokio::runtime::Builder::new_multi_thread()
479 .worker_threads(2)
480 .enable_all()
481 .build()
482 .unwrap();
483 rt.block_on(async move {
484 let total_message = 1000000;
485 let (tx, mut rx) = tokio::sync::mpsc::channel::<i32>(100);
486 let start = Instant::now();
487 tokio::spawn(async move {
488 println!("sender thread send {} message start", total_message);
489 let mut _tx = tx.clone();
490 for i in 0i32..total_message {
491 let _ = _tx.send(i).await;
492 }
493 println!("sender thread send {} message end", total_message);
494 });
495
496 println!("receiver thread recv {} message start", total_message);
497 for _ in 0..total_message {
498 rx.recv().await;
499 }
500 println!("receiver thread recv {} message end", total_message);
501 let end = Instant::now();
502
503 println!("{} message, single sender thread single receiver thread use tokio::sync::channel, {} /s",
504 total_message, (total_message as f64) / end.duration_since(start).as_secs_f64());
505 });
506 }
507
508 #[test]
509 fn test_mpmc_sender_close() {
510 let (tx, rx) = bounded_tx_blocking_rx_future::<i32>(10);
511 let total_msg_count = 5;
512 for i in 0..total_msg_count {
513 let _ = tx.send(i);
514 }
515 drop(tx);
516 let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
517 rt.block_on(async move {
518 let mut recv_msg_count = 0;
519 loop {
520 match rx.recv().await {
521 Ok(_) => {
522 recv_msg_count += 1;
523 }
524 Err(_) => {
525 break;
526 }
527 }
528 }
529 assert_eq!(recv_msg_count, total_msg_count);
530 });
531 }
532
533 #[test]
534 fn test_future_both_1_thread_single() {
535 let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
536 rt.block_on(async move {
537 let (tx, rx) = bounded_future_both::<i32>(10);
538 let rx_res = rx.try_recv();
539 assert!(rx_res.is_err());
540 assert!(rx_res.unwrap_err().is_empty());
541 for i in 0i32..10 {
542 let tx_res = tx.try_send(i);
543 assert!(tx_res.is_ok());
544 }
545 let tx_res = tx.try_send(11);
546 assert!(tx_res.is_err());
547 assert!(tx_res.unwrap_err().is_full());
548
549 let (noti_tx, noti_rx) = tokio::sync::oneshot::channel::<bool>();
550 tokio::spawn(async move {
551 for i in 0i32..12 {
552 match rx.recv().await {
553 Ok(j) => {
554 println!("recv {}", i);
555 assert_eq!(i, j);
556 }
557 Err(e) => {
558 panic!("error {}", e);
559 }
560 }
561 }
562 let res = rx.recv().await;
563 assert!(res.is_err());
564 println!("rx close");
565 let _ = noti_tx.send(true);
566 });
567 assert!(tx.send(10).await.is_ok());
568 tokio::time::sleep(Duration::from_secs(1)).await;
569 assert!(tx.send(11).await.is_ok());
570 drop(tx);
571 let _ = noti_rx.await;
572 });
573 }
574
575 #[test]
576 fn test_tx_blocking_rx_future_1_thread_single() {
577 let rt = tokio::runtime::Builder::new_multi_thread()
578 .enable_all()
579 .worker_threads(1)
580 .build()
581 .unwrap();
582 rt.block_on(async move {
583 let (tx, rx) = bounded_tx_blocking_rx_future::<i32>(10);
584 let rx_res = rx.try_recv();
585 assert!(rx_res.is_err());
586 assert!(rx_res.unwrap_err().is_empty());
587 for i in 0i32..10 {
588 let tx_res = tx.send(i);
589 assert!(tx_res.is_ok());
590 }
591 let tx_res = tx.try_send(11);
592 assert!(tx_res.is_err());
593 assert!(tx_res.unwrap_err().is_full());
594
595 let (noti_tx, noti_rx) = tokio::sync::oneshot::channel::<bool>();
596 tokio::spawn(async move {
597 for i in 0i32..12 {
598 match rx.recv().await {
599 Ok(j) => {
600 println!("recv {}", i);
601 assert_eq!(i, j);
602 }
603 Err(e) => {
604 panic!("error {}", e);
605 }
606 }
607 }
608 let res = rx.recv().await;
609 assert!(res.is_err());
610 println!("rx close");
611 let _ = noti_tx.send(true);
612 });
613 assert!(tx.send(10).is_ok());
614 tokio::time::sleep(Duration::from_secs(1)).await;
615 assert!(tx.send(11).is_ok());
616 drop(tx);
617 let _ = noti_rx.await;
618 });
619 }
620
621 #[test]
622 fn test_tx_future_rx_blocking_1_thread_single() {
623 let rt = tokio::runtime::Builder::new_multi_thread()
624 .enable_all()
625 .worker_threads(1)
626 .build()
627 .unwrap();
628 rt.block_on(async move {
629 let (tx, rx) = bounded_tx_future_rx_blocking::<i32>(10);
630 let rx_res = rx.try_recv();
631 assert!(rx_res.is_err());
632 assert!(rx_res.unwrap_err().is_empty());
633 for i in 0i32..10 {
634 let tx_res = tx.send(i).await;
635 assert!(tx_res.is_ok());
636 }
637 let tx_res = tx.try_send(11);
638 assert!(tx_res.is_err());
639 assert!(tx_res.unwrap_err().is_full());
640
641 let (noti_tx, noti_rx) = tokio::sync::oneshot::channel::<bool>();
642 tokio::spawn(async move {
643 for i in 0i32..5 {
644 assert!(tx.send(10 + i).await.is_ok());
645 tokio::time::sleep(Duration::from_secs(1)).await;
646 }
647 println!("tx close");
648 let _ = noti_tx.send(true);
649 });
650 for i in 0i32..15 {
651 match rx.recv() {
652 Ok(j) => {
653 println!("recv {}", i);
654 assert_eq!(i, j);
655 }
656 Err(e) => {
657 panic!("error {}", e);
658 }
659 }
660 }
661 let res = rx.recv();
662 assert!(res.is_err());
663 drop(rx);
664 let _ = noti_rx.await;
665 });
666 }
667
668 #[test]
669 fn test_future_both_1_thread_multi_4tx_2rx() {
670 _future_both_thread_multi(1, 4, 2);
671 }
672
673 #[test]
674 fn test_future_both_2_thread_multi_4tx_2rx() {
675 _future_both_thread_multi(2, 4, 2);
676 }
677
678 #[test]
679 fn test_future_both_8_thread_multi_4tx_4rx() {
680 _future_both_thread_multi(8, 4, 4);
681 }
682
683 fn _future_both_thread_multi(real_threads: usize, tx_count: usize, rx_count: usize) {
684 let rt = tokio::runtime::Builder::new_multi_thread()
685 .worker_threads(real_threads)
686 .enable_all()
687 .build()
688 .unwrap();
689 rt.block_on(async move {
690 let (tx, rx) = bounded_future_both::<i32>(10);
691 let (noti_tx, mut noti_rx) = tokio::sync::mpsc::channel::<usize>(tx_count + rx_count);
692
693 let counter = Arc::new(AtomicI32::new(0));
694 let round = 100000;
695 println!("");
696 for _tx_i in 0..tx_count {
697 let _tx = tx.clone();
698 let mut _noti_tx = noti_tx.clone();
699 let _round = round;
700 tokio::spawn(async move {
701 for i in 0i32.._round {
702 match _tx.send(i).await {
703 Err(e) => panic!("{}", e),
704 _ => {}
705 }
706 }
707 let _ = _noti_tx.send(_tx_i).await;
708 println!("tx {} exit", _tx_i);
709 });
710 }
711 for _rx_i in 0..rx_count {
712 let _rx = rx.clone();
713 let mut _noti_tx = noti_tx.clone();
714 let _counter = counter.clone();
715 tokio::spawn(async move {
716 'A: loop {
717 match _rx.recv().await {
718 Ok(_i) => {
719 _counter.as_ref().fetch_add(1i32, Ordering::SeqCst);
720 }
722 Err(_) => break 'A,
723 }
724 }
725 let _ = _noti_tx.send(_rx_i).await;
726 println!("rx {} exit", _rx_i);
727 });
728 }
729 drop(tx);
730 drop(rx);
731 drop(noti_tx);
732 for _ in 0..(rx_count + tx_count) {
733 match noti_rx.recv().await {
734 Some(_) => {}
735 None => break,
736 }
737 }
738 assert_eq!(counter.as_ref().load(Ordering::Acquire), round * (tx_count as i32));
739 });
740 }
741
742 fn _tx_blocking_rx_future_multi(real_threads: usize, tx_count: usize, rx_count: usize) {
743 let rt = tokio::runtime::Builder::new_multi_thread()
744 .worker_threads(real_threads)
745 .enable_all()
746 .build()
747 .unwrap();
748 let (tx, rx) = bounded_tx_blocking_rx_future::<i32>(10);
749 let counter = Arc::new(AtomicI32::new(0));
750 let round = 100000;
751 let mut tx_ths = Vec::new();
752 let send_msg = Arc::new(AtomicI32::new(0));
753 for _tx_i in 0..tx_count {
754 let _tx = tx.clone();
755 let _round = round;
756 let _send_msg = send_msg.clone();
757 tx_ths.push(thread::spawn(move || {
758 loop {
759 let i = _send_msg.fetch_add(1, Ordering::SeqCst);
760 if i >= round {
761 break;
762 }
763 match _tx.send(i) {
764 Err(e) => panic!("{}", e),
765 _ => {
766 }
768 }
769 }
770 println!("tx {} exit", _tx_i);
771 }));
772 }
773 drop(tx);
774 rt.block_on(async move {
775 let (noti_tx, mut noti_rx) = tokio::sync::mpsc::channel::<usize>(rx_count);
776 for _rx_i in 0..rx_count {
777 let _rx = rx.clone();
778 let mut _noti_tx = noti_tx.clone();
779 let _counter = counter.clone();
780 tokio::spawn(async move {
781 'A: loop {
782 match _rx.recv().await {
783 Ok(_i) => {
784 _counter.as_ref().fetch_add(1i32, Ordering::SeqCst);
785 }
787 Err(_) => break 'A,
788 }
789 }
790 println!("rx {} exiting", _rx_i);
791 let _ = _noti_tx.send(_rx_i).await;
792 println!("rx {} exit", _rx_i);
793 });
794 }
795 drop(rx);
796 drop(noti_tx);
797 for _ in 0..(rx_count) {
798 match noti_rx.recv().await {
799 Some(_) => {}
800 None => break,
801 }
802 }
803 assert_eq!(counter.as_ref().load(Ordering::Acquire), round as i32);
804 });
805 for th in tx_ths {
806 let _ = th.join();
807 }
808 }
809
810 #[test]
811 fn test_tx_blocking_rx_future_1_thread_multi_4tx_2rx() {
812 _tx_blocking_rx_future_multi(1, 4, 2);
813 }
814
815 #[test]
816 fn test_tx_blocking_rx_future_2_thread_multi_4tx_3rx() {
817 _tx_blocking_rx_future_multi(2, 4, 3);
818 }
819
820 #[test]
821 fn test_tx_blocking_rx_future_8_thread_multi_4tx_4rx() {
822 _tx_blocking_rx_future_multi(8, 4, 4);
823 }
824
825 fn _tx_future_rx_blocking_multi(real_threads: usize, tx_count: usize, rx_count: usize) {
826 let rt = tokio::runtime::Builder::new_multi_thread()
827 .worker_threads(real_threads)
828 .enable_all()
829 .build()
830 .unwrap();
831 let (tx, rx) = bounded_tx_future_rx_blocking::<i32>(10);
832 let counter = Arc::new(AtomicI32::new(0));
833 let round = 100000;
834 let mut rx_ths = Vec::new();
835 for _rx_i in 0..rx_count {
836 let _rx = rx.clone();
837 let _round = round;
838 let _counter = counter.clone();
839 rx_ths.push(thread::spawn(move || {
840 'A: loop {
841 match _rx.recv() {
842 Ok(_) => {
843 _counter.as_ref().fetch_add(1i32, Ordering::SeqCst);
844 }
846 Err(_) => break 'A,
847 }
848 }
849 println!("rx {} exit", _rx_i);
850 }));
851 }
852 drop(rx);
853 rt.block_on(async move {
854 let (noti_tx, mut noti_rx) = tokio::sync::mpsc::channel::<usize>(tx_count);
855 for _tx_i in 0..tx_count {
856 let _tx = tx.clone();
857 let mut _noti_tx = noti_tx.clone();
858 tokio::spawn(async move {
859 for i in 0i32..round {
860 match _tx.send(i).await {
861 Err(e) => panic!("{}", e),
862 _ => {}
863 }
864 }
865 let _ = _noti_tx.send(_tx_i).await;
866 println!("tx {} exit", _tx_i);
867 });
868 }
869 drop(tx);
870 drop(noti_tx);
871 for _ in 0..(tx_count) {
872 match noti_rx.recv().await {
873 Some(_) => {}
874 None => break,
875 }
876 }
877 });
878 for th in rx_ths {
879 let _ = th.join();
880 }
881 assert_eq!(counter.as_ref().load(Ordering::Acquire), round * (tx_count as i32));
882 }
883
884 #[test]
885 fn test_tx_future_rx_blocking_1_thread_multi_4tx_2rx() {
886 _tx_future_rx_blocking_multi(1, 4, 2);
887 }
888
889 #[test]
890 fn test_tx_future_rx_blocking_2_thread_multi_4tx_3rx() {
891 _tx_future_rx_blocking_multi(2, 4, 3);
892 }
893
894 #[test]
895 fn test_tx_future_rx_blocking_8_thread_multi_4tx_4rx() {
896 _tx_future_rx_blocking_multi(8, 4, 4);
897 }
898
899 #[test]
900 fn test_timeout_future_both() {
901 let rt = tokio::runtime::Builder::new_multi_thread()
902 .worker_threads(2)
903 .enable_all()
904 .build()
905 .unwrap();
906 rt.block_on(async move {
907 let (tx, rx) = bounded_future_both::<i32>(100);
908 assert!(timeout(Duration::from_secs(1), rx.recv()).await.is_err());
909 let (tx_done, rx_done) = crossbeam::channel::bounded::<i32>(1);
910 tokio::spawn(async move {
911 match rx.recv().await {
912 Ok(item) => {
913 let _ = tx_done.send(item);
914 }
915 Err(_e) => {
916 println!("recv error");
917 }
918 }
919 });
920 let _ = tx.send(1).await;
921 assert_eq!(rx_done.recv().unwrap(), 1);
922 });
923 }
924}