parallel_try_recv/
parallel-try_recv.rs

1//! send/try_recv 10,000,000 messages in parallel:
2//!
3//! ~30-125ns per send/try_recv
4
5extern crate unbounded_spsc;
6
7const MESSAGE_COUNT     : u64 = 10_000_000;
8
9static SENDER_STARTED   : std::sync::atomic::AtomicBool =
10  std::sync::atomic::AtomicBool::new (false);
11static RECEIVER_STARTED : std::sync::atomic::AtomicBool =
12  std::sync::atomic::AtomicBool::new (false);
13
14#[derive(Debug,PartialEq)]
15struct Mystruct {
16  x : f64,
17  y : f64,
18  z : f64
19}
20
21fn sendfun (sender : unbounded_spsc::Sender <Mystruct>) {
22  let mut counter = 0;
23  SENDER_STARTED.store (true, std::sync::atomic::Ordering::SeqCst);
24  // spin until receiver is started
25  while !RECEIVER_STARTED.load (std::sync::atomic::Ordering::SeqCst) {}
26  let start_time = std::time::SystemTime::now();
27  while counter < MESSAGE_COUNT {
28    sender.send (Mystruct { x: counter as f64, y: 1.5, z: 2.0 }).unwrap();
29    counter += 1;
30  }
31  let duration = start_time.elapsed().unwrap();
32  let duration_ns
33    = (duration.as_secs() * 1_000_000_000) + duration.subsec_nanos() as u64;
34  println!("sendfun duration ns: {}", duration_ns);
35  println!("sendfun ns per message: {}", duration_ns / MESSAGE_COUNT);
36}
37
38fn recvfun (receiver : unbounded_spsc::Receiver <Mystruct>) {
39  RECEIVER_STARTED.store (true, std::sync::atomic::Ordering::SeqCst);
40  // spin until sender is started
41  while !SENDER_STARTED.load (std::sync::atomic::Ordering::SeqCst) {}
42  let start_time = std::time::SystemTime::now();
43  loop {
44    match receiver.try_recv() {
45      Ok  (_m) => (),
46      Err (unbounded_spsc::TryRecvError::Empty) => (),
47      Err (unbounded_spsc::TryRecvError::Disconnected) => break
48    }
49  }
50  let duration = start_time.elapsed().unwrap();
51  let duration_ns
52    = (duration.as_secs() * 1_000_000_000) + duration.subsec_nanos() as u64;
53  println!("recvfun duration ns: {}", duration_ns);
54  println!("recvfun ns per message: {}", duration_ns / MESSAGE_COUNT);
55  println!("buffer ending capacity: {}", receiver.capacity());
56}
57
58fn main() {
59  println!("main...");
60  let (sender, receiver) = unbounded_spsc::channel();
61  let join_sender = std::thread::spawn (move || sendfun (sender));
62  let join_receiver = std::thread::spawn (move || recvfun (receiver));
63  join_sender.join().unwrap();
64  join_receiver.join().unwrap();
65  println!("...main");
66}