atomic_bus/lib.rs
1//! Atomic Bus: An unbounded, lock-free, multi-producer, multi-consumer pub/sub implementation that utilizes atomic operations.
2//!
3//! # Examples
4//!
5//! ## Basic Send/Subscribe
6//!
7//! The following example will start a sender thread and printing subscriber thread.
8//!
9//! ### Code
10//!
11//! ```
12//! use atomic_bus::AtomicBus;
13//! use std::{sync::Arc, time::Duration, thread};
14//!
15//! // create the bus
16//! let bus: AtomicBus<String> = AtomicBus::new();
17//!
18//! // subscribing before spawning the sender thread guarantees all sent messages will be received
19//! let mut subscriber = bus.subscribe_mut();
20//!
21//! // create and spawn a sender
22//! let sender = bus.create_sender();
23//! let arc_message = Arc::new("all messages are an Arc".to_owned());
24//! thread::spawn(move || {
25//! sender.send("hello world!".to_owned());
26//! sender.send(arc_message);
27//! sender.send("done".to_owned());
28//! });
29//!
30//! // spawn printing subscriber and wait for it to complete
31//! thread::spawn(move || loop {
32//! match subscriber.poll() {
33//! None => thread::sleep(Duration::from_millis(1)),
34//! Some(x) => {
35//! println!("subscriber received: {x:?}");
36//! if x.as_ref() == "done" {
37//! return;
38//! }
39//! }
40//! }
41//! })
42//! .join()
43//! .unwrap();
44//! ```
45//!
46//! ### Output
47//!
48//! ```text
49//! subscriber received: "hello world!"
50//! subscriber received: "all messages are an Arc"
51//! subscriber received: "done"
52//! ```
53//!
54//! ## Load Balancing Subscription
55//!
56//! The following example will start a sender thread and multiple subscriber threads that shared the
57//! same [`AtomicSubscriber`] in order to load balance received events between multiple threads.
58//! For the sake of this example, the subscriber threads will always sleep after attempting to poll
59//! in order to simulate load and avoid a single greedy consumer receiving all events before others
60//! have had a chance to start.
61//!
62//! ### Code
63//!
64//! ```
65//! use atomic_bus::AtomicBus;
66//! use std::{sync::Arc, time::Duration, thread};
67//!
68//! // create the bus
69//! let bus: AtomicBus<String> = AtomicBus::new();
70//!
71//! // subscribing before spawning the sender thread guarantees all sent messages will be received
72//! let subscriber = Arc::new(bus.subscribe());
73//!
74//! // create and spawn a sender
75//! let sender = bus.create_sender();
76//! thread::spawn(move || {
77//! for i in 0..10 {
78//! sender.send(format!("message #{i}"));
79//! }
80//! });
81//!
82//! // spawn printing subscriber threads that share a single AtomicSubscription
83//! let mut handles = Vec::new();
84//! {
85//! for i in 0..3 {
86//! let subscriber = Arc::clone(&subscriber);
87//! let handle = thread::spawn(move || loop {
88//! if let Some(x) = subscriber.poll() {
89//! println!("subscriber {i} received: {x:?}");
90//! if x.as_ref() == "done" {
91//! return;
92//! }
93//! }
94//! thread::sleep(Duration::from_millis(10));
95//! });
96//! handles.push(handle);
97//! }
98//! };
99//! ```
100//!
101//! ### Output
102//!
103//! ```text
104//! subscriber 0 received: "message #0"
105//! subscriber 1 received: "message #1"
106//! subscriber 2 received: "message #2"
107//! subscriber 0 received: "message #3"
108//! subscriber 1 received: "message #4"
109//! subscriber 2 received: "message #5"
110//! subscriber 0 received: "message #6"
111//! subscriber 1 received: "message #7"
112//! subscriber 2 received: "message #8"
113//! subscriber 0 received: "message #9"
114//! ```
115
116use std::sync::Arc;
117
118use arc_swap::{ArcSwap, ArcSwapOption};
119
120/// A bus, which can send data to a set of active subscriptions.
121///
122/// # Data Lifetime
123///
124/// All underlying data is wrapped in an `Arc` which will be dropped immediately after all active subscribers have dropped their respective references.
125///
126/// # Sync
127///
128/// `AtomicBus` is [`Sync`], so it can be wrapped in an [`Arc`] to safely share between threads.
129pub struct AtomicBus<T> {
130 end: Arc<ArcSwap<NodeLink<T>>>,
131}
132impl<T> AtomicBus<T> {
133 /// Create a new `AtomicBus`
134 pub fn new() -> Self {
135 Self {
136 end: Arc::new(ArcSwap::new(Arc::new(NodeLink::default()))),
137 }
138 }
139
140 /// Send an event to the [`AtomicBus`] which will be received by all current subscribers.
141 ///
142 /// # Alternatives
143 ///
144 /// If you wish to limit code to be publish-only, see [`AtomicBus::sender`] to create a send-only struct for this bus.
145 pub fn send<A: Into<Arc<T>>>(&self, data: A) {
146 append_tail(self.end.as_ref(), data.into());
147 }
148
149 /// Create a send-only structure to produce messages to this bus.
150 pub fn create_sender(&self) -> AtomicSender<T> {
151 AtomicSender::new(Arc::clone(&self.end))
152 }
153
154 /// Create an atomic subscription to this bus, which is [`Sync`] and can be shared between threads.
155 pub fn subscribe(&self) -> AtomicSubscription<T> {
156 AtomicSubscription::new(&self.end)
157 }
158
159 /// Create a mutable subscription to this bus, which is slightly faster than the atomic variant but is not [`Sync`].
160 pub fn subscribe_mut(&self) -> MutSubscription<T> {
161 MutSubscription::new(&self.end)
162 }
163}
164impl<T> Default for AtomicBus<T> {
165 fn default() -> Self {
166 Self::new()
167 }
168}
169
170/// The send side of an [`AtomicBus`], providing identical functionality to [`AtomicBus::send`], but without the ability to create new subscribers.
171///
172/// # Clone
173///
174/// Cloning an [`AtomicSender`] will result in a new sender that is able to send on the same underlying [`AtomicBus`].
175#[derive(Clone)]
176pub struct AtomicSender<T> {
177 end: Arc<ArcSwap<NodeLink<T>>>,
178}
179impl<T> AtomicSender<T> {
180 fn new(end: Arc<ArcSwap<NodeLink<T>>>) -> Self {
181 Self { end }
182 }
183 /// See [`AtomicBus::send`]
184 pub fn send<A: Into<Arc<T>>>(&self, data: A) {
185 append_tail(self.end.as_ref(), data.into());
186 }
187}
188
189fn append_tail<T>(end: &ArcSwap<NodeLink<T>>, data: Arc<T>) {
190 let node = Arc::new(Node::new(data.into()));
191 let new_end = Arc::clone(&node.link);
192 let old_end = end.swap(new_end);
193 old_end.next.store(Some(node));
194}
195
196/// A [`Sync`] subscription that can be shared between threads, where each message will be delivered exactly once.
197///
198/// # Clone
199///
200/// Cloning an `AtomicSubscription` will create a new subscription at an identical position to the original.
201/// In other words, if there are 10 pending events to consume and the subscription is cloned, the cloned subscription
202/// will also have 10 pending events.
203///
204/// **Note:** This behavior means that cloning an [`AtomicSubscription`] results in different behavior than cloning
205/// an [`Arc<AtomicSubscription>`]. The former will create a new subscription at the same point in the data stream,
206/// while the latter will poll from the same subscription to share the load.
207///
208/// # Performance
209///
210/// The performance will be slightly worse than [`MutSubscription`], but allows for multiple workers to share the load from
211/// multiple threads by taking messages from a shared subscription as they are available.
212pub struct AtomicSubscription<T> {
213 position: Arc<ArcSwap<NodeLink<T>>>,
214}
215impl<T> AtomicSubscription<T> {
216 fn new(position: &ArcSwap<NodeLink<T>>) -> Self {
217 Self {
218 position: Arc::new(ArcSwap::new(position.load_full())),
219 }
220 }
221 /// Poll the next message for this subscription, returning immediately with [`None`] if no new messages were available.
222 pub fn poll(&self) -> Option<Arc<T>> {
223 let mut data = None;
224 self.position
225 .rcu(|old_position| match old_position.next.load_full() {
226 None => {
227 // maintain last position
228 data = None;
229 Arc::clone(old_position)
230 }
231 Some(x) => {
232 // advance position
233 let new_position = Arc::clone(&x.link);
234 data = Some(x);
235 new_position
236 }
237 });
238 data.map(|x| Arc::clone(&x.data))
239 }
240}
241impl<T> Clone for AtomicSubscription<T> {
242 fn clone(&self) -> Self {
243 Self::new(self.position.as_ref())
244 }
245}
246
247/// A mutable subscription that utilizes `&mut self` to track the current position.
248///
249/// # Clone
250///
251/// Cloning a `MutSubscription` will create a new subscription at an identical position to the original.
252/// In other words, if there are 10 pending events to consume and the subscription is cloned, the cloned subscription
253/// will also have 10 pending events.
254///
255/// # Performance
256///
257/// While providing identical functionality to [`AtomicSubscription`], it is slightly faster at the expense of not being [`Sync`].
258#[derive(Clone)]
259pub struct MutSubscription<T> {
260 position: Arc<NodeLink<T>>,
261}
262impl<T> MutSubscription<T> {
263 fn new(position: &ArcSwap<NodeLink<T>>) -> Self {
264 Self {
265 position: position.load_full(),
266 }
267 }
268 /// Poll the next message for this subscription, returning immediately with [`None`] if no new messages were available.
269 pub fn poll(&mut self) -> Option<Arc<T>> {
270 match self.position.next.load_full() {
271 None => None,
272 Some(next) => {
273 let data = Arc::clone(&next.data);
274 self.position = Arc::clone(&next.link);
275 Some(data)
276 }
277 }
278 }
279}
280impl<T> Iterator for MutSubscription<T> {
281 type Item = Arc<T>;
282 fn next(&mut self) -> Option<Self::Item> {
283 self.poll()
284 }
285}
286
287struct Node<T> {
288 pub data: Arc<T>,
289 pub link: Arc<NodeLink<T>>,
290}
291impl<T> Node<T> {
292 fn new(data: Arc<T>) -> Self {
293 Self {
294 data,
295 link: Arc::new(NodeLink::default()),
296 }
297 }
298}
299
300struct NodeLink<T> {
301 pub next: ArcSwapOption<Node<T>>,
302}
303impl<T> Default for NodeLink<T> {
304 fn default() -> Self {
305 Self {
306 next: ArcSwapOption::new(None),
307 }
308 }
309}