crossfire/mpmc/
unbounded.rs1use super::rx::*;
2use super::tx::*;
3use crate::channel::*;
4use crossbeam::queue::SegQueue;
5use std::sync::{
6 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
7 Arc,
8};
9use std::task::*;
10
11pub type RxUnbounded<T> = RxFuture<T, UnboundedSharedFuture>;
12pub type TxUnbounded<T> = TxBlocking<T, UnboundedSharedFuture>;
13
14pub fn unbounded_future<T: Unpin>() -> (TxUnbounded<T>, RxUnbounded<T>) {
17 let (tx, rx) = crossbeam::channel::unbounded();
18 let shared = Arc::new(UnboundedSharedFuture::new());
19
20 let tx_f = TxBlocking::new(tx, shared.clone());
21 let rx_f = RxFuture::new(rx, shared);
22 (tx_f, rx_f)
23}
24
25pub struct UnboundedSharedFuture {
26 tx_count: AtomicUsize,
27 rx_count: AtomicUsize,
28 recv_waker: SegQueue<LockedWakerRef>,
29 recv_waker_tx_seq: AtomicU64,
30 recv_waker_rx_seq: AtomicU64,
31 checking_recv: AtomicBool,
32}
33
34impl MPMCShared for UnboundedSharedFuture {
35 fn new() -> Self {
36 Self {
37 recv_waker: SegQueue::new(),
38 tx_count: AtomicUsize::new(1),
39 rx_count: AtomicUsize::new(1),
40 recv_waker_tx_seq: AtomicU64::new(0),
41 recv_waker_rx_seq: AtomicU64::new(0),
42 checking_recv: AtomicBool::new(false),
43 }
44 }
45
46 #[inline]
47 fn on_recv(&self) {}
48
49 #[inline]
50 fn on_send(&self) {
51 on_send_m!(self)
52 }
53
54 #[inline]
55 fn reg_recv(&self, ctx: &mut Context) -> Option<LockedWaker> {
56 reg_recv_m!(self, ctx)
57 }
58
59 #[inline]
60 fn reg_send(&self, _ctx: &mut Context) -> Option<LockedWaker> {
61 None
62 }
63
64 #[inline(always)]
65 fn add_tx(&self) {
66 let _ = self.tx_count.fetch_add(1, Ordering::SeqCst);
67 }
68
69 #[inline(always)]
70 fn add_rx(&self) {
71 let _ = self.rx_count.fetch_add(1, Ordering::SeqCst);
72 }
73
74 #[inline]
75 fn close_tx(&self) {
76 close_tx_common!(self)
77 }
78
79 #[inline]
80 fn close_rx(&self) {
81 let _ = self.rx_count.fetch_sub(1, Ordering::SeqCst);
82 return;
83 }
84
85 #[inline]
86 fn get_tx_count(&self) -> usize {
87 self.tx_count.load(Ordering::SeqCst)
88 }
89
90 fn get_waker_length(&self) -> (usize, usize) {
91 (0, self.recv_waker.len())
92 }
93
94 #[inline]
95 fn clear_recv_wakers(&self, waker: LockedWaker) {
96 clear_recv_wakers_common!(self, waker.get_seq())
97 }
98}
99
100#[cfg(test)]
101mod tests {
102
103 extern crate tokio;
104 use super::*;
105 use std::sync::atomic::{AtomicI32, Ordering};
106 use std::thread;
107 use std::time::Instant;
108 use tokio::time::Duration;
109
110 #[test]
111 fn bench_std_channel_performance() {
112 println!();
113 let total_message = 1000000;
114 let (tx, rx) = std::sync::mpsc::channel();
115 let start = Instant::now();
116 thread::spawn(move || {
117 let _tx = tx.clone();
118 for i in 0..total_message {
119 let _ = _tx.send(i);
120 }
121 });
122
123 for _ in 0..total_message {
124 rx.recv().unwrap();
125 }
126 let end = Instant::now();
127
128 println!("{} message, single sender thread single receiver thread use std::sync::sync_channel, cost time:{} s",
129 total_message, (total_message as f64) / end.duration_since(start).as_secs_f64());
130 }
131
132 #[test]
133 fn bench_crossbeam_channel_performance() {
134 println!();
135 let total_message = 1000000;
136 let (tx, rx) = crossbeam::channel::unbounded();
137 let start = Instant::now();
138 thread::spawn(move || {
139 let _tx = tx.clone();
140 for i in 0..total_message {
141 let _ = _tx.send(i);
142 }
143 });
144
145 for _ in 0..total_message {
146 rx.recv().unwrap();
147 }
148 let end = Instant::now();
149
150 println!(
151 "{} message, single sender thread single receiver thread use crossbeam::channel, {} /s",
152 total_message,
153 (total_message as f64) / end.duration_since(start).as_secs_f64()
154 );
155 }
156
157 #[test]
158 fn bench_tokio_mpsc_performance() {
159 println!();
160 let rt = tokio::runtime::Builder::new_multi_thread()
161 .worker_threads(2)
162 .enable_all()
163 .build()
164 .unwrap();
165 rt.block_on(async move {
166 let total_message = 1000000;
167 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<i32>();
168 let start = Instant::now();
169 tokio::spawn(async move {
170 println!("sender thread send {} message start", total_message);
171 let mut _tx = tx.clone();
172 for i in 0i32..total_message {
173 let _ = _tx.send(i);
174 }
175 println!("sender thread send {} message end", total_message);
176 });
177
178 println!("receiver thread recv {} message start", total_message);
179 for _ in 0..total_message {
180 rx.recv().await;
181 }
182 println!("receiver thread recv {} message end", total_message);
183 let end = Instant::now();
184
185 println!("{} message, single sender thread single receiver thread use tokio::sync::channel, {} /s",
186 total_message, (total_message as f64) / end.duration_since(start).as_secs_f64());
187 });
188 }
189
190 #[test]
191 fn bench_tx_blocking_rx_future_performance() {
192 println!();
193 let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
194 let total_message = 1000000;
195 let (tx, rx_f) = unbounded_future::<i32>();
196 let start = Instant::now();
197 thread::spawn(move || {
198 for i in 0..total_message {
199 let _ = tx.send(i);
200 }
201 });
202 rt.block_on(async move {
203 for _ in 0..total_message {
204 let _ = rx_f.recv().await;
205 }
206 let end = Instant::now();
207
208 println!(
209 "{} message, single sender thread single receiver thread use mpmc {} /s",
210 total_message,
211 (total_message as f64) / end.duration_since(start).as_secs_f64()
212 );
213 });
214 }
215
216 fn _tx_blocking_rx_future_multi(real_threads: usize, tx_count: usize) {
217 let rt = tokio::runtime::Builder::new_multi_thread()
218 .worker_threads(real_threads)
219 .enable_all()
220 .build()
221 .unwrap();
222 let (tx, rx) = unbounded_future::<i32>();
223 let counter = Arc::new(AtomicI32::new(0));
224 let round = 100000;
225 let mut tx_ths = Vec::new();
226 for _tx_i in 0..tx_count {
227 let _tx = tx.clone();
228 let _round = round;
229 tx_ths.push(thread::spawn(move || {
230 for i in 0i32.._round {
231 match _tx.send(i) {
232 Err(e) => panic!("{}", e),
233 _ => {}
234 }
235 }
236 println!("tx {} exit", _tx_i);
237 }));
238 }
239 drop(tx);
240 rt.block_on(async move {
241 'A: loop {
242 match rx.recv().await {
243 Ok(_) => {
244 counter.as_ref().fetch_add(1i32, Ordering::SeqCst);
245 }
247 Err(_) => break 'A,
248 }
249 }
250 assert_eq!(counter.as_ref().load(Ordering::Acquire), round * (tx_count as i32));
251 });
252 for th in tx_ths {
253 let _ = th.join();
254 }
255 }
256
257 #[test]
258 fn test_tx_blocking_rx_future_1_thread_multi_4tx() {
259 _tx_blocking_rx_future_multi(1, 4);
260 }
261
262 #[test]
263 fn test_tx_blocking_rx_future_2_thread_multi_4tx() {
264 _tx_blocking_rx_future_multi(2, 4);
265 }
266
267 #[test]
268 fn test_tx_blocking_rx_future_8_thread_multi_4tx() {
269 _tx_blocking_rx_future_multi(8, 4);
270 }
271
272 #[test]
273 fn test_tx_unbounded_idle_select() {
274 use futures::{pin_mut, select, FutureExt};
275 let rt = tokio::runtime::Builder::new_multi_thread()
276 .worker_threads(2)
277 .enable_all()
278 .build()
279 .unwrap();
280 let (_tx, rx_f) = unbounded_future::<i32>();
281
282 async fn loop_fn() {
283 tokio::time::sleep(Duration::from_millis(1)).await;
284 }
285
286 rt.block_on(async move {
287 let mut c = rx_f.make_recv_future().fuse();
288 for _ in 0..1000 {
289 {
290 let f = loop_fn().fuse();
291 pin_mut!(f);
292 select! {
293 _ = f => {
294 let (_tx_wakers, _rx_wakers) = rx_f.get_waker_length();
295 },
297 _ = c => {
298 unreachable!()
299 },
300 }
301 }
302 }
303 let (tx_wakers, rx_wakers) = rx_f.get_waker_length();
304 assert_eq!(tx_wakers, 0);
305 println!("waker rx {}", rx_wakers);
306 });
307 }
308}