crossfire/mpmc/
unbounded.rs

1use super::rx::*;
2use super::tx::*;
3use crate::channel::*;
4use crossbeam::queue::SegQueue;
5use std::sync::{
6    atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
7    Arc,
8};
9use std::task::*;
10
11pub type RxUnbounded<T> = RxFuture<T, UnboundedSharedFuture>;
12pub type TxUnbounded<T> = TxBlocking<T, UnboundedSharedFuture>;
13
14/// Initiate a unbounded channel.
15/// Sender will never block, so we use the same TxBlocking for threads
16pub fn unbounded_future<T: Unpin>() -> (TxUnbounded<T>, RxUnbounded<T>) {
17    let (tx, rx) = crossbeam::channel::unbounded();
18    let shared = Arc::new(UnboundedSharedFuture::new());
19
20    let tx_f = TxBlocking::new(tx, shared.clone());
21    let rx_f = RxFuture::new(rx, shared);
22    (tx_f, rx_f)
23}
24
25pub struct UnboundedSharedFuture {
26    tx_count: AtomicUsize,
27    rx_count: AtomicUsize,
28    recv_waker: SegQueue<LockedWakerRef>,
29    recv_waker_tx_seq: AtomicU64,
30    recv_waker_rx_seq: AtomicU64,
31    checking_recv: AtomicBool,
32}
33
34impl MPMCShared for UnboundedSharedFuture {
35    fn new() -> Self {
36        Self {
37            recv_waker: SegQueue::new(),
38            tx_count: AtomicUsize::new(1),
39            rx_count: AtomicUsize::new(1),
40            recv_waker_tx_seq: AtomicU64::new(0),
41            recv_waker_rx_seq: AtomicU64::new(0),
42            checking_recv: AtomicBool::new(false),
43        }
44    }
45
46    #[inline]
47    fn on_recv(&self) {}
48
49    #[inline]
50    fn on_send(&self) {
51        on_send_m!(self)
52    }
53
54    #[inline]
55    fn reg_recv(&self, ctx: &mut Context) -> Option<LockedWaker> {
56        reg_recv_m!(self, ctx)
57    }
58
59    #[inline]
60    fn reg_send(&self, _ctx: &mut Context) -> Option<LockedWaker> {
61        None
62    }
63
64    #[inline(always)]
65    fn add_tx(&self) {
66        let _ = self.tx_count.fetch_add(1, Ordering::SeqCst);
67    }
68
69    #[inline(always)]
70    fn add_rx(&self) {
71        let _ = self.rx_count.fetch_add(1, Ordering::SeqCst);
72    }
73
74    #[inline]
75    fn close_tx(&self) {
76        close_tx_common!(self)
77    }
78
79    #[inline]
80    fn close_rx(&self) {
81        let _ = self.rx_count.fetch_sub(1, Ordering::SeqCst);
82        return;
83    }
84
85    #[inline]
86    fn get_tx_count(&self) -> usize {
87        self.tx_count.load(Ordering::SeqCst)
88    }
89
90    fn get_waker_length(&self) -> (usize, usize) {
91        (0, self.recv_waker.len())
92    }
93
94    #[inline]
95    fn clear_recv_wakers(&self, waker: LockedWaker) {
96        clear_recv_wakers_common!(self, waker.get_seq())
97    }
98}
99
100#[cfg(test)]
101mod tests {
102
103    extern crate tokio;
104    use super::*;
105    use std::sync::atomic::{AtomicI32, Ordering};
106    use std::thread;
107    use std::time::Instant;
108    use tokio::time::Duration;
109
110    #[test]
111    fn bench_std_channel_performance() {
112        println!();
113        let total_message = 1000000;
114        let (tx, rx) = std::sync::mpsc::channel();
115        let start = Instant::now();
116        thread::spawn(move || {
117            let _tx = tx.clone();
118            for i in 0..total_message {
119                let _ = _tx.send(i);
120            }
121        });
122
123        for _ in 0..total_message {
124            rx.recv().unwrap();
125        }
126        let end = Instant::now();
127
128        println!("{} message, single sender thread single receiver thread use std::sync::sync_channel, cost time:{} s",
129                 total_message, (total_message as f64) / end.duration_since(start).as_secs_f64());
130    }
131
132    #[test]
133    fn bench_crossbeam_channel_performance() {
134        println!();
135        let total_message = 1000000;
136        let (tx, rx) = crossbeam::channel::unbounded();
137        let start = Instant::now();
138        thread::spawn(move || {
139            let _tx = tx.clone();
140            for i in 0..total_message {
141                let _ = _tx.send(i);
142            }
143        });
144
145        for _ in 0..total_message {
146            rx.recv().unwrap();
147        }
148        let end = Instant::now();
149
150        println!(
151            "{} message, single sender thread single receiver thread use crossbeam::channel, {} /s",
152            total_message,
153            (total_message as f64) / end.duration_since(start).as_secs_f64()
154        );
155    }
156
157    #[test]
158    fn bench_tokio_mpsc_performance() {
159        println!();
160        let rt = tokio::runtime::Builder::new_multi_thread()
161            .worker_threads(2)
162            .enable_all()
163            .build()
164            .unwrap();
165        rt.block_on(async move {
166            let total_message = 1000000;
167            let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<i32>();
168            let start = Instant::now();
169            tokio::spawn(async move {
170                println!("sender thread send {} message start", total_message);
171                let mut _tx = tx.clone();
172                for i in 0i32..total_message {
173                    let _ = _tx.send(i);
174                }
175                println!("sender thread send {} message end", total_message);
176            });
177
178            println!("receiver thread recv {} message start", total_message);
179            for _ in 0..total_message {
180                rx.recv().await;
181            }
182            println!("receiver thread recv {} message end", total_message);
183            let end = Instant::now();
184
185            println!("{} message, single sender thread single receiver thread use tokio::sync::channel, {} /s",
186                     total_message, (total_message as f64) / end.duration_since(start).as_secs_f64());
187        });
188    }
189
190    #[test]
191    fn bench_tx_blocking_rx_future_performance() {
192        println!();
193        let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
194        let total_message = 1000000;
195        let (tx, rx_f) = unbounded_future::<i32>();
196        let start = Instant::now();
197        thread::spawn(move || {
198            for i in 0..total_message {
199                let _ = tx.send(i);
200            }
201        });
202        rt.block_on(async move {
203            for _ in 0..total_message {
204                let _ = rx_f.recv().await;
205            }
206            let end = Instant::now();
207
208            println!(
209                "{} message, single sender thread single receiver thread use mpmc {} /s",
210                total_message,
211                (total_message as f64) / end.duration_since(start).as_secs_f64()
212            );
213        });
214    }
215
216    fn _tx_blocking_rx_future_multi(real_threads: usize, tx_count: usize) {
217        let rt = tokio::runtime::Builder::new_multi_thread()
218            .worker_threads(real_threads)
219            .enable_all()
220            .build()
221            .unwrap();
222        let (tx, rx) = unbounded_future::<i32>();
223        let counter = Arc::new(AtomicI32::new(0));
224        let round = 100000;
225        let mut tx_ths = Vec::new();
226        for _tx_i in 0..tx_count {
227            let _tx = tx.clone();
228            let _round = round;
229            tx_ths.push(thread::spawn(move || {
230                for i in 0i32.._round {
231                    match _tx.send(i) {
232                        Err(e) => panic!("{}", e),
233                        _ => {}
234                    }
235                }
236                println!("tx {} exit", _tx_i);
237            }));
238        }
239        drop(tx);
240        rt.block_on(async move {
241            'A: loop {
242                match rx.recv().await {
243                    Ok(_) => {
244                        counter.as_ref().fetch_add(1i32, Ordering::SeqCst);
245                        //print!("{} {}\r", _rx_i, i);
246                    }
247                    Err(_) => break 'A,
248                }
249            }
250            assert_eq!(counter.as_ref().load(Ordering::Acquire), round * (tx_count as i32));
251        });
252        for th in tx_ths {
253            let _ = th.join();
254        }
255    }
256
257    #[test]
258    fn test_tx_blocking_rx_future_1_thread_multi_4tx() {
259        _tx_blocking_rx_future_multi(1, 4);
260    }
261
262    #[test]
263    fn test_tx_blocking_rx_future_2_thread_multi_4tx() {
264        _tx_blocking_rx_future_multi(2, 4);
265    }
266
267    #[test]
268    fn test_tx_blocking_rx_future_8_thread_multi_4tx() {
269        _tx_blocking_rx_future_multi(8, 4);
270    }
271
272    #[test]
273    fn test_tx_unbounded_idle_select() {
274        use futures::{pin_mut, select, FutureExt};
275        let rt = tokio::runtime::Builder::new_multi_thread()
276            .worker_threads(2)
277            .enable_all()
278            .build()
279            .unwrap();
280        let (_tx, rx_f) = unbounded_future::<i32>();
281
282        async fn loop_fn() {
283            tokio::time::sleep(Duration::from_millis(1)).await;
284        }
285
286        rt.block_on(async move {
287            let mut c = rx_f.make_recv_future().fuse();
288            for _ in 0..1000 {
289                {
290                    let f = loop_fn().fuse();
291                    pin_mut!(f);
292                    select! {
293                        _ = f => {
294                            let (_tx_wakers, _rx_wakers) = rx_f.get_waker_length();
295                            //println!("waker tx{} rx {}", _tx_wakers, _rx_wakers);
296                        },
297                        _ = c => {
298                            unreachable!()
299                        },
300                    }
301                }
302            }
303            let (tx_wakers, rx_wakers) = rx_f.get_waker_length();
304            assert_eq!(tx_wakers, 0);
305            println!("waker rx {}", rx_wakers);
306        });
307    }
308}