Skip to main content

parallel_try_recv/
parallel-try_recv.rs

1//! `send/try_recv` 10,000,000 messages in parallel:
2//!
3//! ~30-125ns per `send/try_recv`
4
5use std;
6use unbounded_spsc;
7
8const MESSAGE_COUNT     : u64 = 10_000_000;
9
10static SENDER_STARTED   : std::sync::atomic::AtomicBool =
11  std::sync::atomic::AtomicBool::new (false);
12static RECEIVER_STARTED : std::sync::atomic::AtomicBool =
13  std::sync::atomic::AtomicBool::new (false);
14
15#[derive(Debug,PartialEq)]
16struct Mystruct {
17  x : f64,
18  y : f64,
19  z : f64
20}
21
22fn sendfun (sender : unbounded_spsc::Sender <Mystruct>) {
23  let mut counter = 0;
24  SENDER_STARTED.store (true, std::sync::atomic::Ordering::SeqCst);
25  // spin until receiver is started
26  while !RECEIVER_STARTED.load (std::sync::atomic::Ordering::SeqCst) {
27    std::hint::spin_loop()
28  }
29  let start_time = std::time::SystemTime::now();
30  while counter < MESSAGE_COUNT {
31    sender.send (Mystruct { x: counter as f64, y: 1.5, z: 2.0 }).unwrap();
32    counter += 1;
33  }
34  let duration = start_time.elapsed().unwrap();
35  let duration_ns
36    = (duration.as_secs() * 1_000_000_000) + duration.subsec_nanos() as u64;
37  println!("sendfun duration ns: {duration_ns}");
38  println!("sendfun ns per message: {}", duration_ns / MESSAGE_COUNT);
39}
40
41fn recvfun (receiver : unbounded_spsc::Receiver <Mystruct>) {
42  RECEIVER_STARTED.store (true, std::sync::atomic::Ordering::SeqCst);
43  // spin until sender is started
44  while !SENDER_STARTED.load (std::sync::atomic::Ordering::SeqCst) {
45    std::hint::spin_loop()
46  }
47  let start_time = std::time::SystemTime::now();
48  loop {
49    match receiver.try_recv() {
50      Ok  (_m) => (),
51      Err (unbounded_spsc::TryRecvError::Empty) => (),
52      Err (unbounded_spsc::TryRecvError::Disconnected) => break
53    }
54  }
55  let duration = start_time.elapsed().unwrap();
56  let duration_ns
57    = (duration.as_secs() * 1_000_000_000) + duration.subsec_nanos() as u64;
58  println!("recvfun duration ns: {duration_ns}");
59  println!("recvfun ns per message: {}", duration_ns / MESSAGE_COUNT);
60  println!("buffer ending capacity: {}", receiver.capacity());
61}
62
63fn main() {
64  println!("main...");
65  let (sender, receiver) = unbounded_spsc::channel();
66  let join_sender = std::thread::spawn (move || sendfun (sender));
67  let join_receiver = std::thread::spawn (move || recvfun (receiver));
68  join_sender.join().unwrap();
69  join_receiver.join().unwrap();
70  println!("...main");
71}