pub struct Publisher<T: MorbDataType> { /* private fields */ }Implementations§
Source§impl<T: MorbDataType> Publisher<T>
impl<T: MorbDataType> Publisher<T>
Sourcepub fn publish(&self, data: T)
pub fn publish(&self, data: T)
Publishes a value to the topic.
Examples found in repository?
examples/pubsub_benchmark.rs (line 55)
49fn benchmark_publish_only(iterations: u64, queue_size: u16) {
50 let topic = create_topic::<u64>(unique_topic_name("publish_only"), queue_size).unwrap();
51 let publisher = topic.create_publisher();
52
53 let started = Instant::now();
54 for value in 0..iterations {
55 publisher.publish(std::hint::black_box(value));
56 }
57 print_result("publish_only", iterations, started.elapsed());
58}
59
60fn benchmark_publish_consume(iterations: u64, queue_size: u16) {
61 let topic = create_topic::<u64>(unique_topic_name("publish_consume"), queue_size).unwrap();
62 let publisher = topic.create_publisher();
63 let mut subscriber = topic.create_subscriber();
64
65 let started = Instant::now();
66 for value in 0..iterations {
67 publisher.publish(std::hint::black_box(value));
68 std::hint::black_box(subscriber.check_update_and_copy().unwrap());
69 }
70 print_result("publish_consume", iterations, started.elapsed());
71}
72
73fn benchmark_publish_poll(iterations: u64, queue_size: u16) {
74 let topic = create_topic::<u64>(unique_topic_name("publish_poll"), queue_size).unwrap();
75 let publisher = topic.create_publisher();
76 let mut poller = TopicPoller::new();
77 poller.add_topic(&topic).unwrap();
78
79 let started = Instant::now();
80 for value in 0..iterations {
81 publisher.publish(std::hint::black_box(value));
82 poller.wait(Some(Duration::ZERO)).unwrap();
83 for token in poller.iter() {
84 std::hint::black_box(token);
85 }
86 }
87 print_result("publish_poll", iterations, started.elapsed());
88}
89
90fn benchmark_multi_producer(iterations: u64) {
91 const PRODUCERS: usize = 4;
92
93 let topic = create_topic::<u64>(unique_topic_name("multi_producer"), 1024).unwrap();
94 let barrier = Arc::new(Barrier::new(PRODUCERS + 1));
95 let per_producer = iterations / PRODUCERS as u64;
96 let total_ops = per_producer * PRODUCERS as u64;
97
98 let mut handles = Vec::new();
99 for producer_id in 0..PRODUCERS {
100 let barrier = barrier.clone();
101 let publisher = topic.create_publisher();
102 handles.push(thread::spawn(move || {
103 barrier.wait();
104 for seq in 0..per_producer {
105 publisher.publish((producer_id as u64 * per_producer) + seq);
106 }
107 }));
108 }
109
110 let started = Instant::now();
111 barrier.wait();
112 for handle in handles {
113 handle.join().unwrap();
114 }
115
116 print_result("multi_producer", total_ops, started.elapsed());
117}
118
119fn benchmark_multi_subscriber(iterations: u64) {
120 const SUBSCRIBERS: usize = 4;
121
122 let messages = iterations.min(50_000);
123 let queue_size = messages as u16;
124 let topic = create_topic::<u64>(unique_topic_name("multi_subscriber"), queue_size).unwrap();
125 let publisher = topic.create_publisher();
126 let start_barrier = Arc::new(Barrier::new(SUBSCRIBERS + 1));
127
128 let mut handles = Vec::new();
129 for _ in 0..SUBSCRIBERS {
130 let start_barrier = start_barrier.clone();
131 let mut subscriber = topic.create_subscriber();
132 handles.push(thread::spawn(move || {
133 start_barrier.wait();
134 for _ in 0..messages {
135 loop {
136 if let Some(value) = subscriber.check_update_and_copy() {
137 std::hint::black_box(value);
138 break;
139 }
140 thread::yield_now();
141 }
142 }
143 }));
144 }
145
146 let started = Instant::now();
147 start_barrier.wait();
148 for value in 0..messages {
149 publisher.publish(std::hint::black_box(value));
150 }
151 for handle in handles {
152 handle.join().unwrap();
153 }
154
155 print_result(
156 "multi_subscriber",
157 messages * SUBSCRIBERS as u64,
158 started.elapsed(),
159 );
160}
161
162fn benchmark_large_message<T>(name: &str, iterations: u64, sample: T)
163where
164 T: Clone + Copy + Send + Sync + 'static,
165{
166 let topic = create_topic::<T>(unique_topic_name(name), 1024).unwrap();
167 let publisher = topic.create_publisher();
168 let mut subscriber = topic.create_subscriber();
169
170 let started = Instant::now();
171 for _ in 0..iterations {
172 publisher.publish(std::hint::black_box(sample));
173 std::hint::black_box(subscriber.check_update_and_copy().unwrap());
174 }
175
176 print_result(name, iterations, started.elapsed());
177}
178
179fn benchmark_blocking_poll(iterations: u64) {
180 let waits = iterations.min(50_000);
181 let topic = create_topic::<u64>(unique_topic_name("blocking_poll"), 1024).unwrap();
182 let publisher = topic.create_publisher();
183 let (tx, rx) = mpsc::sync_channel::<()>(0);
184 let topic_for_thread = topic.clone();
185
186 let handle = thread::spawn(move || {
187 for value in 0..waits {
188 rx.recv().unwrap();
189 publisher.publish(value);
190 }
191 drop(topic_for_thread);
192 });
193
194 let mut poller = TopicPoller::new();
195 poller.add_topic(&topic).unwrap();
196
197 let started = Instant::now();
198 for _ in 0..waits {
199 tx.send(()).unwrap();
200 poller.wait(Some(Duration::from_secs(1))).unwrap();
201 for token in poller.iter() {
202 std::hint::black_box(token);
203 }
204 topic.clear_event();
205 }
206 handle.join().unwrap();
207
208 print_result("blocking_poll", waits, started.elapsed());
209}Auto Trait Implementations§
impl<T> Freeze for Publisher<T>
impl<T> RefUnwindSafe for Publisher<T>
impl<T> Send for Publisher<T>
impl<T> Sync for Publisher<T>
impl<T> Unpin for Publisher<T>
impl<T> UnsafeUnpin for Publisher<T>
impl<T> UnwindSafe for Publisher<T>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more