Skip to main content

pubsub_benchmark/
pubsub_benchmark.rs

1use morb::{TopicPoller, create_topic};
2use std::env;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, Barrier, mpsc};
5use std::thread;
6use std::time::{Duration, Instant};
7
8static TOPIC_ID: AtomicU64 = AtomicU64::new(0);
9
10#[allow(dead_code)]
11#[derive(Clone, Copy)]
12struct Payload64 {
13    bytes: [u8; 64],
14}
15
16#[allow(dead_code)]
17#[derive(Clone, Copy)]
18struct Payload256 {
19    bytes: [u8; 256],
20}
21
22#[allow(dead_code)]
23#[derive(Clone, Copy)]
24struct Payload1024 {
25    bytes: [u8; 1024],
26}
27
28fn unique_topic_name(prefix: &str) -> String {
29    let id = TOPIC_ID.fetch_add(1, Ordering::Relaxed);
30    format!("{prefix}_{id}")
31}
32
33fn fmt_ns_per_op(duration: Duration, ops: u64) -> f64 {
34    duration.as_secs_f64() * 1_000_000_000.0 / ops as f64
35}
36
37fn fmt_ops_per_sec(duration: Duration, ops: u64) -> f64 {
38    ops as f64 / duration.as_secs_f64()
39}
40
41fn print_result(name: &str, ops: u64, elapsed: Duration) {
42    println!(
43        "{name:<18} {ops:>12} ops  {:>12.2} ns/op  {:>12.2} ops/s",
44        fmt_ns_per_op(elapsed, ops),
45        fmt_ops_per_sec(elapsed, ops),
46    );
47}
48
49fn benchmark_publish_only(iterations: u64, queue_size: u16) {
50    let topic = create_topic::<u64>(unique_topic_name("publish_only"), queue_size).unwrap();
51    let publisher = topic.create_publisher();
52
53    let started = Instant::now();
54    for value in 0..iterations {
55        publisher.publish(std::hint::black_box(value));
56    }
57    print_result("publish_only", iterations, started.elapsed());
58}
59
60fn benchmark_publish_consume(iterations: u64, queue_size: u16) {
61    let topic = create_topic::<u64>(unique_topic_name("publish_consume"), queue_size).unwrap();
62    let publisher = topic.create_publisher();
63    let mut subscriber = topic.create_subscriber();
64
65    let started = Instant::now();
66    for value in 0..iterations {
67        publisher.publish(std::hint::black_box(value));
68        std::hint::black_box(subscriber.check_update_and_copy().unwrap());
69    }
70    print_result("publish_consume", iterations, started.elapsed());
71}
72
73fn benchmark_publish_poll(iterations: u64, queue_size: u16) {
74    let topic = create_topic::<u64>(unique_topic_name("publish_poll"), queue_size).unwrap();
75    let publisher = topic.create_publisher();
76    let mut poller = TopicPoller::new();
77    poller.add_topic(&topic).unwrap();
78
79    let started = Instant::now();
80    for value in 0..iterations {
81        publisher.publish(std::hint::black_box(value));
82        poller.wait(Some(Duration::ZERO)).unwrap();
83        for token in poller.iter() {
84            std::hint::black_box(token);
85        }
86    }
87    print_result("publish_poll", iterations, started.elapsed());
88}
89
90fn benchmark_multi_producer(iterations: u64) {
91    const PRODUCERS: usize = 4;
92
93    let topic = create_topic::<u64>(unique_topic_name("multi_producer"), 1024).unwrap();
94    let barrier = Arc::new(Barrier::new(PRODUCERS + 1));
95    let per_producer = iterations / PRODUCERS as u64;
96    let total_ops = per_producer * PRODUCERS as u64;
97
98    let mut handles = Vec::new();
99    for producer_id in 0..PRODUCERS {
100        let barrier = barrier.clone();
101        let publisher = topic.create_publisher();
102        handles.push(thread::spawn(move || {
103            barrier.wait();
104            for seq in 0..per_producer {
105                publisher.publish((producer_id as u64 * per_producer) + seq);
106            }
107        }));
108    }
109
110    let started = Instant::now();
111    barrier.wait();
112    for handle in handles {
113        handle.join().unwrap();
114    }
115
116    print_result("multi_producer", total_ops, started.elapsed());
117}
118
119fn benchmark_multi_subscriber(iterations: u64) {
120    const SUBSCRIBERS: usize = 4;
121
122    let messages = iterations.min(50_000);
123    let queue_size = messages as u16;
124    let topic = create_topic::<u64>(unique_topic_name("multi_subscriber"), queue_size).unwrap();
125    let publisher = topic.create_publisher();
126    let start_barrier = Arc::new(Barrier::new(SUBSCRIBERS + 1));
127
128    let mut handles = Vec::new();
129    for _ in 0..SUBSCRIBERS {
130        let start_barrier = start_barrier.clone();
131        let mut subscriber = topic.create_subscriber();
132        handles.push(thread::spawn(move || {
133            start_barrier.wait();
134            for _ in 0..messages {
135                loop {
136                    if let Some(value) = subscriber.check_update_and_copy() {
137                        std::hint::black_box(value);
138                        break;
139                    }
140                    thread::yield_now();
141                }
142            }
143        }));
144    }
145
146    let started = Instant::now();
147    start_barrier.wait();
148    for value in 0..messages {
149        publisher.publish(std::hint::black_box(value));
150    }
151    for handle in handles {
152        handle.join().unwrap();
153    }
154
155    print_result(
156        "multi_subscriber",
157        messages * SUBSCRIBERS as u64,
158        started.elapsed(),
159    );
160}
161
162fn benchmark_large_message<T>(name: &str, iterations: u64, sample: T)
163where
164    T: Clone + Copy + Send + Sync + 'static,
165{
166    let topic = create_topic::<T>(unique_topic_name(name), 1024).unwrap();
167    let publisher = topic.create_publisher();
168    let mut subscriber = topic.create_subscriber();
169
170    let started = Instant::now();
171    for _ in 0..iterations {
172        publisher.publish(std::hint::black_box(sample));
173        std::hint::black_box(subscriber.check_update_and_copy().unwrap());
174    }
175
176    print_result(name, iterations, started.elapsed());
177}
178
179fn benchmark_blocking_poll(iterations: u64) {
180    let waits = iterations.min(50_000);
181    let topic = create_topic::<u64>(unique_topic_name("blocking_poll"), 1024).unwrap();
182    let publisher = topic.create_publisher();
183    let (tx, rx) = mpsc::sync_channel::<()>(0);
184    let topic_for_thread = topic.clone();
185
186    let handle = thread::spawn(move || {
187        for value in 0..waits {
188            rx.recv().unwrap();
189            publisher.publish(value);
190        }
191        drop(topic_for_thread);
192    });
193
194    let mut poller = TopicPoller::new();
195    poller.add_topic(&topic).unwrap();
196
197    let started = Instant::now();
198    for _ in 0..waits {
199        tx.send(()).unwrap();
200        poller.wait(Some(Duration::from_secs(1))).unwrap();
201        for token in poller.iter() {
202            std::hint::black_box(token);
203        }
204        topic.clear_event();
205    }
206    handle.join().unwrap();
207
208    print_result("blocking_poll", waits, started.elapsed());
209}
210
211fn parse_iterations() -> u64 {
212    env::args()
213        .nth(1)
214        .and_then(|value| value.parse::<u64>().ok())
215        .unwrap_or(1_000_000)
216}
217
218fn main() {
219    let iterations = parse_iterations();
220    let queue_size = 1024;
221
222    println!("morb benchmark");
223    println!("iterations: {iterations}");
224    println!("queue_size: {queue_size}");
225
226    benchmark_publish_only(iterations, queue_size);
227    benchmark_publish_consume(iterations, queue_size);
228    benchmark_publish_poll(iterations, queue_size);
229    benchmark_multi_producer(iterations);
230    benchmark_multi_subscriber(iterations);
231    benchmark_large_message("large_msg_64", iterations, Payload64 { bytes: [7; 64] });
232    benchmark_large_message("large_msg_256", iterations, Payload256 { bytes: [9; 256] });
233    benchmark_large_message(
234        "large_msg_1024",
235        iterations,
236        Payload1024 { bytes: [11; 1024] },
237    );
238    benchmark_blocking_poll(iterations);
239}