amity/lib.rs
1//! # Amity
2//!
3//! Collection of concurrency algorithms.
4//! The collection is not fixed and more algorithms will be added.
5//! Algorithm may be removed from crate only if its implementation is unsound and cannot be fixed.
6//! In this case it will be deprecated first and removed in later version.
7//!
8//! Most algorithms require its own feature flag to be enabled.
9//!
10//! ## Available Algorithms
11//!
12//! ### 🔄 Backoff
13//! Provides utilities for implementing exponential backoff strategies, useful in retry mechanisms.
14//!
15//! #### Examples
16//!
17//! Here is an example of using the `BackOff` struct to implement exponential backoff when waiting for a resource:
18//!
19//! ```rust
20//! use amity::backoff::BackOff;
21//!
22//! fn try_acquire_resource(attempt: u64) -> Option<Resource> {
23//! // Simulate trying to acquire a shared resource
24//! if attempt.wrapping_mul(0xfedcba9876543210) < u64::MAX / 10 {
25//! Some(Resource {})
26//! } else {
27//! None
28//! }
29//! }
30//!
31//! struct Resource {}
32//!
33//! fn main() {
34//! let mut backoff = BackOff::new();
35//! let mut attempt = 0;
36//!
37//! loop {
38//! // Try to acquire the resource
39//! attempt += 1;
40//! if let Some(resource) = try_acquire_resource(attempt) {
41//! // Resource acquired, use it
42//! println!("Resource acquired!");
43//! break;
44//! }
45//!
46//! // Failed to acquire, check if we should block
47//! if backoff.should_block() {
48//! println!("Backoff limit reached, blocking thread");
49//! std::thread::sleep(std::time::Duration::from_millis(10));
50//! backoff.reset(); // Reset the backoff counter after blocking
51//! } else {
52//! // Wait with exponential backoff
53//! backoff.wait();
54//! }
55//! }
56//! }
57//! ```
58//!
59//! If blocking is not an option, you can use `BackOff::wait()`.
60//!
61//! ```rust
62//! use amity::backoff::BackOff;
63//!
64//! fn try_acquire_resource(attempt: u64) -> Option<Resource> {
65//! // Simulate trying to acquire a shared resource
66//! if attempt.wrapping_mul(0xfedcba9876543210) < u64::MAX / 10 {
67//! Some(Resource {})
68//! } else {
69//! None
70//! }
71//! }
72//!
73//! struct Resource {}
74//!
75//! fn main() {
76//! let mut backoff = BackOff::new();
77//! let mut attempt = 0;
78//!
79//! loop {
80//! // Try to acquire the resource
81//! attempt += 1;
82//! if let Some(resource) = try_acquire_resource(attempt) {
83//! // Resource acquired, use it
84//! println!("Resource acquired!");
85//! break;
86//! }
87//!
88//! // Wait with exponential backoff
89//! backoff.wait();
90//! }
91//! }
92//! ```
93//!
94//! The `BackOff` struct helps manage contention by implementing an efficient waiting strategy - first spinning, then yielding, and finally suggesting when the thread should block completely.
95//!
96//! ### 🌀 Cache
97//! Implements utilities for working with cache lines, optimizing memory access patterns in concurrent programming.
98//!
99//! #### Examples
100//!
101//! Here is an example of using the `CachePadded` struct to prevent false sharing in a concurrent scenario:
102//!
103//! ```rust
104//! use amity::cache::CachePadded;
105//! use std::{cell::UnsafeCell, sync::Arc, thread};
106//!
107//! struct SharedCounters {
108//! // Each counter is padded to avoid false sharing when accessed by different threads
109//! counter1: CachePadded<UnsafeCell<usize>>,
110//! counter2: CachePadded<UnsafeCell<usize>>,
111//! }
112//!
113//! unsafe impl Sync for SharedCounters {}
114//!
115//! impl SharedCounters {
116//! fn new() -> Self {
117//! Self {
118//! counter1: CachePadded(UnsafeCell::new(0)),
119//! counter2: CachePadded(UnsafeCell::new(0)),
120//! }
121//! }
122//! }
123//!
124//! let counters = Arc::new(SharedCounters::new());
125//! let counters_clone = Arc::clone(&counters);
126//!
127//! // Thread 1 updates counter1
128//! let thread1 = thread::spawn(move || {
129//! for _ in 0..1_000_000 {
130//! unsafe {
131//! // Need to use unsafe to get mutable access in this example
132//! let counter1 = &mut *counters_clone.counter1.get();
133//! *counter1 += 1;
134//! }
135//! }
136//! });
137//!
138//! let counters_clone = Arc::clone(&counters);
139//!
140//! // Thread 2 updates counter2 without false sharing
141//! let thread2 = thread::spawn(move || {
142//! for _ in 0..1_000_000 {
143//! unsafe {
144//! let counter2 = &mut *counters_clone.counter1.get();
145//! *counter2 += 1;
146//! }
147//! }
148//! });
149//!
150//! thread1.join().unwrap();
151//! thread2.join().unwrap();
152//! ```
153//!
154//! This example demonstrates how `CachePadded` can be used to prevent false sharing when multiple threads need to update different data simultaneously.
155//!
156//! ### 📐 State Pointer
157//! Combines state and pointer into a single atomic value, enabling efficient state management in concurrent programming.
158//!
159//! #### Examples
160//!
161//! Here is an example of using the `PtrState` and `AtomicPtrState` to efficiently combine state information with pointers:
162//!
163//! ```rust
164//! use amity::state_ptr::{PtrState, State, AtomicPtrState};
165//! use std::sync::atomic::Ordering;
166//!
167//! struct Node {
168//! data: u64,
169//! }
170//!
171//! // Create a sample node
172//! let mut node = Node { data: 42 };
173//!
174//! // Create a state value (limited by pointer alignment)
175//! let state = State::<Node>::new(3).unwrap();
176//!
177//! // Combine pointer and state into a single value
178//! let ptr_state = PtrState::new_mut(&mut node, state);
179//!
180//! // Extract pointer and state separately
181//! let ptr = ptr_state.ptr();
182//! let extracted_state = ptr_state.state();
183//!
184//! // Use the extracted pointer safely
185//! unsafe {
186//! assert_eq!((*ptr).data, 42);
187//! }
188//!
189//! println!("State value: {}", extracted_state.value());
190//!
191//! // Atomic version for thread-safe operations
192//! let atomic_ptr_state = AtomicPtrState::new_mut(&mut node, state);
193//!
194//! // Load the combined value atomically
195//! let loaded_ptr_state = atomic_ptr_state.load(Ordering::Acquire);
196//!
197//! // Update state while preserving the pointer
198//! let new_state = State::<Node>::new(5).unwrap();
199//! let updated_ptr_state = loaded_ptr_state.with_state(new_state);
200//!
201//! // Store the updated value atomically
202//! atomic_ptr_state.store(updated_ptr_state, Ordering::Release);
203//!
204//! println!("New state value: {}",
205//! atomic_ptr_state.load(Ordering::Relaxed).state().value());
206//! ```
207//!
208//! This pattern is useful in concurrent data structures where you need to pack flags or other state information with pointers without additional memory overhead. For example, it can be used in lock-free algorithms to mark nodes as "deleted" or to store version counters for ABA problem prevention.
209//!
210//! ### 🔗 Ring Buffer
211//! Implements simple ring-buffer that can be used to build concurrent data structures.
212//! **Feature:** `ring-buffer`
213//!
214//! #### Examples
215//!
216//! Here is an example of using the `RingBuffer` struct for efficient FIFO operations:
217//!
218//! ```rust
219//! # #[cfg(feature = "ring-buffer")]
220//! # {
221//! use amity::ring_buffer::RingBuffer;
222//!
223//! // Create a new ring buffer
224//! let mut buffer = RingBuffer::<i32>::new();
225//!
226//! // Push some elements
227//! buffer.push(10);
228//! buffer.push(20);
229//! buffer.push(30);
230//!
231//! // Check the buffer status
232//! println!("Buffer length: {}", buffer.len());
233//! println!("Buffer capacity: {}", buffer.capacity());
234//!
235//! // Pop elements (FIFO order)
236//! while let Some(value) = buffer.pop() {
237//! println!("Popped value: {}", value);
238//! }
239//!
240//! // Buffer is now empty
241//! assert!(buffer.is_empty());
242//!
243//! // Using with_capacity for performance
244//! let mut buffer = RingBuffer::<String>::with_capacity(10);
245//!
246//! // Fill the buffer
247//! for i in 0..5 {
248//! buffer.push(format!("Item {}", i));
249//! }
250//!
251//! // Use drain to consume all elements
252//! for item in buffer.drain() {
253//! println!("Drained: {}", item);
254//! }
255//!
256//! // Buffer is automatically cleared after drain
257//! assert!(buffer.is_empty());
258//! # }
259//! ```
260//!
261//! Ring buffers are particularly useful in scenarios requiring fixed-size queues or when implementing producer-consumer patterns where elements need to be processed in order.
262//!
263//! ### 🔃 Flip Queue
264//! Queue implementation that allows concurrent writes, but only exclusive reads.
265//! This is useful for scenarios where multiple threads need to write data concurrently, and single thread swaps inner and own `RingBuffer` to read it.
266//!
267//! `FlipBuffer` is lockless version and requires mutable access to read and to expand internal buffer when full.
268//!
269//! `FlipQueue` uses read-write lock to allow concurrent pushes until buffer is full,
270//! and locks it for exclusive access to grow buffer and to swap it with reader.
271//! **Feature:** `flip-queue`
272//!
273//! #### Examples
274//!
275//! Here is an example of using the `FlipQueue` for concurrent writes from multiple threads and exclusive reads:
276//!
277//! ```rust
278//! # #[cfg(feature = "flip-queue")]
279//! # {
280//! use amity::flip_queue::FlipQueue;
281//! use amity::ring_buffer::RingBuffer;
282//! use std::sync::Arc;
283//! use std::thread;
284//!
285//! // Create a shared flip queue with initial capacity
286//! let queue = Arc::new(FlipQueue::<usize>::with_capacity(16));
287//!
288//! // Spawn multiple producer threads
289//! let mut handles = vec![];
290//! for thread_id in 0..4 {
291//! let queue_clone = Arc::clone(&queue);
292//! let handle = thread::spawn(move || {
293//! for i in 0..25 {
294//! let value = thread_id * 100 + i;
295//! queue_clone.push_sync(value);
296//! println!("Thread {} pushed {}", thread_id, value);
297//! }
298//! });
299//! handles.push(handle);
300//! }
301//!
302//! // Wait for producers to finish
303//! for handle in handles {
304//! handle.join().unwrap();
305//! }
306//!
307//! // Using drain_locking for exclusive access
308//! let items: Vec<_> = queue.drain_locking(|drain| drain.collect());
309//! println!("Collected {} items", items.len());
310//!
311//! // Alternative approach: swap with an empty buffer for bulk processing
312//! let mut queue = FlipQueue::<String>::new();
313//!
314//! // Add some items
315//! for i in 0..10 {
316//! queue.push(format!("Item {}", i));
317//! }
318//!
319//! // Prepare an empty buffer to swap with
320//! let mut buffer = RingBuffer::new();
321//!
322//! // Swap buffers - very efficient for bulk processing
323//! queue.swap_buffer(&mut buffer);
324//!
325//! // Process items from the swapped buffer
326//! while let Some(item) = buffer.pop() {
327//! println!("Processing: {}", item);
328//! }
329//! # }
330//! ```
331//!
332//! This pattern is especially useful when you have multiple producers that need to add data concurrently without blocking each other, but processing happens on a single consumer thread. The `swap_buffer` method provides a very efficient way to batch process items without holding locks for extended periods.
333//!
334//! ### 🔺 Triple
335//! Implements triple-buffering for wait-free data transfer between single producer single and consumer threads.
336//! This allows for efficient data exchange without the need for locks.
337//!
338//! Both consumer and producer has exclusive access to its own slot, allowing taking a mutable reference to it, which grants the ability to modify data in place, unlike channel-based approaches.
339//!
340//! **Feature:** `triple`
341//!
342//! #### Examples
343//!
344//! Here is an example of using the `TripleBuffer`:
345//!
346//! ```rust
347//! # #[cfg(feature = "triple")]
348//! # {
349//! use amity::triple::TripleBuffer;
350//!
351//! // Create a new triple buffer with initial values
352//! let mut buffer = TripleBuffer::<u32>::default();
353//!
354//! // Split the buffer into producer and consumer
355//! let (mut producer, mut consumer) = buffer.split_mut();
356//!
357//! // Producer updates its element
358//! *producer.get_mut() = 42;
359//!
360//! // Publish the updated element
361//! producer.publish();
362//!
363//! // Consumer consumes the element
364//! if consumer.consume() {
365//! println!("Consumed value: {}", consumer.get());
366//! }
367//! # }
368//! ```
369//!
370//! ### 📡 Broad
371//! A broadcast mechanism to notify multiple listeners of events concurrently.
372//! **Feature:** `broad`
373//!
374//! #### Examples
375//!
376//! Here is an example of using the `broad` module:
377//!
378//! ```rust
379//! # #[cfg(feature = "broad")]
380//! # {
381//! use amity::broad::{Receiver, Sender};
382//!
383//! // Create a new broadcast channel with an initial value
384//! let mut tx = Sender::new(0u32);
385//! let mut rx = tx.receiver();
386//!
387//! // Sender sends a new value
388//! tx.send(42);
389//!
390//! // Receiver receives the new value
391//! if let Some(value) = rx.recv() {
392//! println!("Received value: {}", value);
393//! }
394//! # }
395//! ```
396//!
397//! ### 🔁 Spin
398//! Provides a low-latency spinlock implementation for mutual exclusion in critical sections. Also includes a not-totally-unfair read-write spin lock for efficient concurrent read and write operations.
399//! **Feature:** `spin`
400//!
401//! #### Examples
402//!
403//! Here is an example of using the `Spin` mutex for mutual exclusion:
404//!
405//! ```rust
406//! # #[cfg(feature = "spin")]
407//! # {
408//! use amity::spin::Spin;
409//! use std::sync::Arc;
410//! use std::thread;
411//!
412//! let counter = Arc::new(Spin::new(0));
413//! let mut handles = vec![];
414//!
415//! // Spawn multiple threads that increment the counter
416//! for _ in 0..10 {
417//! let counter_clone = Arc::clone(&counter);
418//! let handle = thread::spawn(move || {
419//! for _ in 0..100 {
420//! // Lock the mutex and update the counter
421//! let mut count = counter_clone.lock();
422//! *count += 1;
423//! // Mutex automatically unlocks when `count` goes out of scope
424//! }
425//! });
426//! handles.push(handle);
427//! }
428//!
429//! // Wait for all threads to complete
430//! for handle in handles {
431//! handle.join().unwrap();
432//! }
433//!
434//! println!("Final count: {}", *counter.lock());
435//! assert_eq!(*counter.lock(), 1000);
436//! # }
437//! ```
438//!
439//! Here is an example of using the `RwSpin` read-write lock for concurrent read access and exclusive write access:
440//!
441//! ```rust
442//! # #[cfg(feature = "spin")]
443//! # {
444//! use amity::spin::RwSpin;
445//! use std::sync::Arc;
446//! use std::thread;
447//! use std::time::Duration;
448//!
449//! let data = Arc::new(RwSpin::new(vec![1, 2, 3, 4]));
450//! let mut handles = vec![];
451//!
452//! // Spawn reader threads
453//! for i in 0..3 {
454//! let data_clone = Arc::clone(&data);
455//! let handle = thread::spawn(move || {
456//! for _ in 0..5 {
457//! // Acquire a read lock - multiple readers can access simultaneously
458//! let values = data_clone.read();
459//! println!("Reader {}: Current values: {:?}", i, *values);
460//!
461//! // Simulate some processing time
462//! thread::sleep(Duration::from_millis(10));
463//!
464//! // Read lock automatically released when `values` goes out of scope
465//! }
466//! });
467//! handles.push(handle);
468//! }
469//!
470//! // Spawn writer threads
471//! for i in 0..2 {
472//! let data_clone = Arc::clone(&data);
473//! let handle = thread::spawn(move || {
474//! for j in 0..3 {
475//! // Acquire a write lock - exclusive access
476//! let mut values = data_clone.write();
477//! values.push(i * 10 + j);
478//! println!("Writer {}: Added value {}", i, i * 10 + j);
479//!
480//! // Simulate some processing time
481//! thread::sleep(Duration::from_millis(50));
482//!
483//! // Write lock automatically released when `values` goes out of scope
484//! }
485//! });
486//! handles.push(handle);
487//! }
488//!
489//! // Wait for all threads to complete
490//! for handle in handles {
491//! handle.join().unwrap();
492//! }
493//!
494//! // Print the final state
495//! let final_data = data.read();
496//! println!("Final values: {:?}", *final_data);
497//! # }
498//! ```
499//!
500//! ## `no-std` support
501//!
502//! If algorithm requires `std` its feature name will have `-std` suffix and enable `std` feature.
503//!
504
505#![cfg_attr(not(feature = "std"), no_std)]
506#![deny(clippy::pedantic)]
507#![allow(clippy::inline_always)]
508
509#[cfg(feature = "alloc")]
510extern crate alloc;
511
512pub mod backoff;
513pub mod cache;
514pub mod state_ptr;
515
516#[cfg(feature = "broad")]
517pub mod broad;
518
519#[cfg(feature = "flip-queue")]
520pub mod flip_queue;
521
522#[cfg(feature = "ring-buffer")]
523pub mod ring_buffer;
524
525#[cfg(feature = "spin")]
526pub mod spin;
527
528#[cfg(feature = "triple")]
529pub mod triple;
530
531#[cfg(all(feature = "spin", not(feature = "parking_lot")))]
532#[allow(dead_code)]
533type DefaultRawRwLock = spin::RawRwSpin;
534
535#[cfg(feature = "parking_lot")]
536#[allow(dead_code)]
537type DefaultRawRwLock = parking_lot::RawRwLock;
538
539// One central function responsible for reporting capacity overflows. This'll
540// ensure that the code generation related to these panics is minimal as there's
541// only one location which panics rather than a bunch throughout the module.
542#[cfg(feature = "alloc")]
543#[allow(dead_code)]
544#[cold]
545#[inline(never)]
546fn capacity_overflow() -> ! {
547 panic!("capacity overflow");
548}
549
550// pub enum Ordering {
551// Relaxed,
552// Release,
553// Acquire,
554// AcqRel,
555// SeqCst,
556// }
557
558// #[inline]
559// fn merge_ordering(lhs: Ordering, rhs: Ordering) -> Ordering {
560// match (lhs, rhs) {
561// (Ordering::SeqCst, _) => Ordering::SeqCst,
562// (_, Ordering::SeqCst) => Ordering::SeqCst,
563// (Ordering::AcqRel, _) => Ordering::AcqRel,
564// (_, Ordering::AcqRel) => Ordering::AcqRel,
565// (Ordering::Acquire, Ordering::Release) => Ordering::AcqRel,
566// (Ordering::Release, Ordering::Acquire) => Ordering::AcqRel,
567// (Ordering::Acquire, _) => Ordering::Acquire,
568// (_, Ordering::Acquire) => Ordering::Acquire,
569// (Ordering::Release, _) => Ordering::Release,
570// (_, Ordering::Release) => Ordering::Release,
571// (Ordering::Relaxed, Ordering::Relaxed) => Ordering::Relaxed,
572// _ => unreachable!("amity does not use any other ordering"),
573// }
574// }