atomic_bus/
lib.rs

1//! Atomic Bus: An unbounded, lock-free, multi-producer, multi-consumer pub/sub implementation that utilizes atomic operations.
2//!
3//! # Examples
4//!
5//! ## Basic Send/Subscribe
6//!
7//! The following example will start a sender thread and printing subscriber thread.
8//!
9//! ### Code
10//!
11//! ```
12//! use atomic_bus::AtomicBus;
13//! use std::{sync::Arc, time::Duration, thread};
14//!
15//! // create the bus
16//! let bus: AtomicBus<String> = AtomicBus::new();
17//!
18//! // subscribing before spawning the sender thread guarantees all sent messages will be received
19//! let mut subscriber = bus.subscribe_mut();
20//!
21//! // create and spawn a sender
22//! let sender = bus.create_sender();
23//! let arc_message = Arc::new("all messages are an Arc".to_owned());
24//! thread::spawn(move || {
25//!     sender.send("hello world!".to_owned());
26//!     sender.send(arc_message);
27//!     sender.send("done".to_owned());
28//! });
29//!
30//! // spawn printing subscriber and wait for it to complete
31//! thread::spawn(move || loop {
32//!     match subscriber.poll() {
33//!         None => thread::sleep(Duration::from_millis(1)),
34//!         Some(x) => {
35//!             println!("subscriber received: {x:?}");
36//!             if x.as_ref() == "done" {
37//!                 return;
38//!             }
39//!         }
40//!     }
41//! })
42//! .join()
43//! .unwrap();
44//! ```
45//!
46//! ### Output
47//!
48//! ```text
49//! subscriber received: "hello world!"
50//! subscriber received: "all messages are an Arc"
51//! subscriber received: "done"
52//! ```
53//!
54//! ## Load Balancing Subscription
55//!
56//! The following example will start a sender thread and multiple subscriber threads that shared the
57//! same [`AtomicSubscriber`] in order to load balance received events between multiple threads.
58//! For the sake of this example, the subscriber threads will always sleep after attempting to poll
59//! in order to simulate load and avoid a single greedy consumer receiving all events before others
60//! have had a chance to start.
61//!
62//! ### Code
63//!
64//! ```
65//! use atomic_bus::AtomicBus;
66//! use std::{sync::Arc, time::Duration, thread};
67//!
68//! // create the bus
69//! let bus: AtomicBus<String> = AtomicBus::new();
70//!
71//! // subscribing before spawning the sender thread guarantees all sent messages will be received
72//! let subscriber = Arc::new(bus.subscribe());
73//!
74//! // create and spawn a sender
75//! let sender = bus.create_sender();
76//! thread::spawn(move || {
77//!     for i in 0..10 {
78//!         sender.send(format!("message #{i}"));
79//!     }
80//! });
81//!
82//! // spawn printing subscriber threads that share a single AtomicSubscription
83//! let mut handles = Vec::new();
84//! {
85//!     for i in 0..3 {
86//!         let subscriber = Arc::clone(&subscriber);
87//!         let handle = thread::spawn(move || loop {
88//!             if let Some(x) = subscriber.poll() {
89//!                 println!("subscriber {i} received: {x:?}");
90//!                 if x.as_ref() == "done" {
91//!                     return;
92//!                 }
93//!             }
94//!             thread::sleep(Duration::from_millis(10));
95//!         });
96//!         handles.push(handle);
97//!     }
98//! };
99//! ```
100//!
101//! ### Output
102//!
103//! ```text
104//! subscriber 0 received: "message #0"
105//! subscriber 1 received: "message #1"
106//! subscriber 2 received: "message #2"
107//! subscriber 0 received: "message #3"
108//! subscriber 1 received: "message #4"
109//! subscriber 2 received: "message #5"
110//! subscriber 0 received: "message #6"
111//! subscriber 1 received: "message #7"
112//! subscriber 2 received: "message #8"
113//! subscriber 0 received: "message #9"
114//! ```
115
116use std::sync::Arc;
117
118use arc_swap::{ArcSwap, ArcSwapOption};
119
120/// A bus, which can send data to a set of active subscriptions.
121///
122/// # Data Lifetime
123///
124/// All underlying data is wrapped in an `Arc` which will be dropped immediately after all active subscribers have dropped their respective references.
125///
126/// # Sync
127///
128/// `AtomicBus` is [`Sync`], so it can be wrapped in an [`Arc`] to safely share between threads.
129pub struct AtomicBus<T> {
130    end: Arc<ArcSwap<NodeLink<T>>>,
131}
132impl<T> AtomicBus<T> {
133    /// Create a new `AtomicBus`
134    pub fn new() -> Self {
135        Self {
136            end: Arc::new(ArcSwap::new(Arc::new(NodeLink::default()))),
137        }
138    }
139
140    /// Send an event to the [`AtomicBus`] which will be received by all current subscribers.
141    ///
142    /// # Alternatives
143    ///
144    /// If you wish to limit code to be publish-only, see [`AtomicBus::sender`] to create a send-only struct for this bus.
145    pub fn send<A: Into<Arc<T>>>(&self, data: A) {
146        append_tail(self.end.as_ref(), data.into());
147    }
148
149    /// Create a send-only structure to produce messages to this bus.
150    pub fn create_sender(&self) -> AtomicSender<T> {
151        AtomicSender::new(Arc::clone(&self.end))
152    }
153
154    /// Create an atomic subscription to this bus, which is [`Sync`] and can be shared between threads.
155    pub fn subscribe(&self) -> AtomicSubscription<T> {
156        AtomicSubscription::new(&self.end)
157    }
158
159    /// Create a mutable subscription to this bus, which is slightly faster than the atomic variant but is not [`Sync`].
160    pub fn subscribe_mut(&self) -> MutSubscription<T> {
161        MutSubscription::new(&self.end)
162    }
163}
164impl<T> Default for AtomicBus<T> {
165    fn default() -> Self {
166        Self::new()
167    }
168}
169
170/// The send side of an [`AtomicBus`], providing identical functionality to [`AtomicBus::send`], but without the ability to create new subscribers.
171///
172/// # Clone
173///
174/// Cloning an [`AtomicSender`] will result in a new sender that is able to send on the same underlying [`AtomicBus`].
175#[derive(Clone)]
176pub struct AtomicSender<T> {
177    end: Arc<ArcSwap<NodeLink<T>>>,
178}
179impl<T> AtomicSender<T> {
180    fn new(end: Arc<ArcSwap<NodeLink<T>>>) -> Self {
181        Self { end }
182    }
183    /// See [`AtomicBus::send`]
184    pub fn send<A: Into<Arc<T>>>(&self, data: A) {
185        append_tail(self.end.as_ref(), data.into());
186    }
187}
188
189fn append_tail<T>(end: &ArcSwap<NodeLink<T>>, data: Arc<T>) {
190    let node = Arc::new(Node::new(data.into()));
191    let new_end = Arc::clone(&node.link);
192    let old_end = end.swap(new_end);
193    old_end.next.store(Some(node));
194}
195
196/// A [`Sync`] subscription that can be shared between threads, where each message will be delivered exactly once.
197///
198/// # Clone
199///
200/// Cloning an `AtomicSubscription` will create a new subscription at an identical position to the original.
201/// In other words, if there are 10 pending events to consume and the subscription is cloned, the cloned subscription
202/// will also have 10 pending events.
203///
204/// **Note:** This behavior means that cloning an [`AtomicSubscription`] results in different behavior than cloning
205/// an [`Arc<AtomicSubscription>`]. The former will create a new subscription at the same point in the data stream,
206/// while the latter will poll from the same subscription to share the load.
207///
208/// # Performance
209///
210/// The performance will be slightly worse than [`MutSubscription`], but allows for multiple workers to share the load from
211/// multiple threads by taking messages from a shared subscription as they are available.
212pub struct AtomicSubscription<T> {
213    position: Arc<ArcSwap<NodeLink<T>>>,
214}
215impl<T> AtomicSubscription<T> {
216    fn new(position: &ArcSwap<NodeLink<T>>) -> Self {
217        Self {
218            position: Arc::new(ArcSwap::new(position.load_full())),
219        }
220    }
221    /// Poll the next message for this subscription, returning immediately with [`None`] if no new messages were available.
222    pub fn poll(&self) -> Option<Arc<T>> {
223        let mut data = None;
224        self.position
225            .rcu(|old_position| match old_position.next.load_full() {
226                None => {
227                    // maintain last position
228                    data = None;
229                    Arc::clone(old_position)
230                }
231                Some(x) => {
232                    // advance position
233                    let new_position = Arc::clone(&x.link);
234                    data = Some(x);
235                    new_position
236                }
237            });
238        data.map(|x| Arc::clone(&x.data))
239    }
240}
241impl<T> Clone for AtomicSubscription<T> {
242    fn clone(&self) -> Self {
243        Self::new(self.position.as_ref())
244    }
245}
246
247/// A mutable subscription that utilizes `&mut self` to track the current position.
248///
249/// # Clone
250///
251/// Cloning a `MutSubscription` will create a new subscription at an identical position to the original.
252/// In other words, if there are 10 pending events to consume and the subscription is cloned, the cloned subscription
253/// will also have 10 pending events.
254///
255/// # Performance
256///
257/// While providing identical functionality to [`AtomicSubscription`], it is slightly faster at the expense of not being [`Sync`].
258#[derive(Clone)]
259pub struct MutSubscription<T> {
260    position: Arc<NodeLink<T>>,
261}
262impl<T> MutSubscription<T> {
263    fn new(position: &ArcSwap<NodeLink<T>>) -> Self {
264        Self {
265            position: position.load_full(),
266        }
267    }
268    /// Poll the next message for this subscription, returning immediately with [`None`] if no new messages were available.
269    pub fn poll(&mut self) -> Option<Arc<T>> {
270        match self.position.next.load_full() {
271            None => None,
272            Some(next) => {
273                let data = Arc::clone(&next.data);
274                self.position = Arc::clone(&next.link);
275                Some(data)
276            }
277        }
278    }
279}
280impl<T> Iterator for MutSubscription<T> {
281    type Item = Arc<T>;
282    fn next(&mut self) -> Option<Self::Item> {
283        self.poll()
284    }
285}
286
287struct Node<T> {
288    pub data: Arc<T>,
289    pub link: Arc<NodeLink<T>>,
290}
291impl<T> Node<T> {
292    fn new(data: Arc<T>) -> Self {
293        Self {
294            data,
295            link: Arc::new(NodeLink::default()),
296        }
297    }
298}
299
300struct NodeLink<T> {
301    pub next: ArcSwapOption<Node<T>>,
302}
303impl<T> Default for NodeLink<T> {
304    fn default() -> Self {
305        Self {
306            next: ArcSwapOption::new(None),
307        }
308    }
309}