1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
//! Atomic Bus: An unbounded, lock-free, multi-producer, multi-consumer pub/sub implementation that utilizes atomic operations.
//!
//! # Examples
//!
//! ## Basic Send/Subscribe
//!
//! The following example will start a sender thread and printing subscriber thread.
//!
//! ### Code
//!
//! ```
//! use atomic_bus::AtomicBus;
//! use std::{sync::Arc, time::Duration, thread};
//!
//! // create the bus
//! let bus: AtomicBus<String> = AtomicBus::new();
//!
//! // subscribing before spawning the sender thread guarantees all sent messages will be received
//! let mut subscriber = bus.subscribe_mut();
//!
//! // create and spawn a sender
//! let sender = bus.create_sender();
//! let arc_message = Arc::new("all messages are an Arc".to_owned());
//! thread::spawn(move || {
//! sender.send("hello world!".to_owned());
//! sender.send(arc_message);
//! sender.send("done".to_owned());
//! });
//!
//! // spawn printing subscriber and wait for it to complete
//! thread::spawn(move || loop {
//! match subscriber.poll() {
//! None => thread::sleep(Duration::from_millis(1)),
//! Some(x) => {
//! println!("subscriber received: {x:?}");
//! if x.as_ref() == "done" {
//! return;
//! }
//! }
//! }
//! })
//! .join()
//! .unwrap();
//! ```
//!
//! ### Output
//!
//! ```text
//! subscriber received: "hello world!"
//! subscriber received: "all messages are an Arc"
//! subscriber received: "done"
//! ```
//!
//! ## Load Balancing Subscription
//!
//! The following example will start a sender thread and multiple subscriber threads that shared the
//! same [`AtomicSubscriber`] in order to load balance received events between multiple threads.
//! For the sake of this example, the subscriber threads will always sleep after attempting to poll
//! in order to simulate load and avoid a single greedy consumer receiving all events before others
//! have had a chance to start.
//!
//! ### Code
//!
//! ```
//! use atomic_bus::AtomicBus;
//! use std::{sync::Arc, time::Duration, thread};
//!
//! // create the bus
//! let bus: AtomicBus<String> = AtomicBus::new();
//!
//! // subscribing before spawning the sender thread guarantees all sent messages will be received
//! let subscriber = Arc::new(bus.subscribe());
//!
//! // create and spawn a sender
//! let sender = bus.create_sender();
//! thread::spawn(move || {
//! for i in 0..10 {
//! sender.send(format!("message #{i}"));
//! }
//! });
//!
//! // spawn printing subscriber threads that share a single AtomicSubscription
//! let mut handles = Vec::new();
//! {
//! for i in 0..3 {
//! let subscriber = Arc::clone(&subscriber);
//! let handle = thread::spawn(move || loop {
//! if let Some(x) = subscriber.poll() {
//! println!("subscriber {i} received: {x:?}");
//! if x.as_ref() == "done" {
//! return;
//! }
//! }
//! thread::sleep(Duration::from_millis(10));
//! });
//! handles.push(handle);
//! }
//! };
//! ```
//!
//! ### Output
//!
//! ```text
//! subscriber 0 received: "message #0"
//! subscriber 1 received: "message #1"
//! subscriber 2 received: "message #2"
//! subscriber 0 received: "message #3"
//! subscriber 1 received: "message #4"
//! subscriber 2 received: "message #5"
//! subscriber 0 received: "message #6"
//! subscriber 1 received: "message #7"
//! subscriber 2 received: "message #8"
//! subscriber 0 received: "message #9"
//! ```
use Arc;
use ;
/// A bus, which can send data to a set of active subscriptions.
///
/// # Data Lifetime
///
/// All underlying data is wrapped in an `Arc` which will be dropped immediately after all active subscribers have dropped their respective references.
///
/// # Sync
///
/// `AtomicBus` is [`Sync`], so it can be wrapped in an [`Arc`] to safely share between threads.
/// The send side of an [`AtomicBus`], providing identical functionality to [`AtomicBus::send`], but without the ability to create new subscribers.
///
/// # Clone
///
/// Cloning an [`AtomicSender`] will result in a new sender that is able to send on the same underlying [`AtomicBus`].
/// A [`Sync`] subscription that can be shared between threads, where each message will be delivered exactly once.
///
/// # Clone
///
/// Cloning an `AtomicSubscription` will create a new subscription at an identical position to the original.
/// In other words, if there are 10 pending events to consume and the subscription is cloned, the cloned subscription
/// will also have 10 pending events.
///
/// **Note:** This behavior means that cloning an [`AtomicSubscription`] results in different behavior than cloning
/// an [`Arc<AtomicSubscription>`]. The former will create a new subscription at the same point in the data stream,
/// while the latter will poll from the same subscription to share the load.
///
/// # Performance
///
/// The performance will be slightly worse than [`MutSubscription`], but allows for multiple workers to share the load from
/// multiple threads by taking messages from a shared subscription as they are available.
/// A mutable subscription that utilizes `&mut self` to track the current position.
///
/// # Clone
///
/// Cloning a `MutSubscription` will create a new subscription at an identical position to the original.
/// In other words, if there are 10 pending events to consume and the subscription is cloned, the cloned subscription
/// will also have 10 pending events.
///
/// # Performance
///
/// While providing identical functionality to [`AtomicSubscription`], it is slightly faster at the expense of not being [`Sync`].