thread_share/
atomic.rs

1//! # Atomic Module - ArcThreadShare<T>
2//!
3//! This module provides `ArcThreadShare<T>`, a high-performance structure for
4//! zero-copy data sharing between threads using atomic operations.
5//!
6//! ## ⚠️ Important Warning
7//!
8//! **`ArcThreadShare<T>` has significant limitations and should be used with caution!**
9//!
10//! ## Overview
11//!
12//! `ArcThreadShare<T>` uses `Arc<AtomicPtr<T>>` internally to provide zero-copy
13//! data sharing without locks. While this can offer high performance, it comes
14//! with important trade-offs.
15//!
16//! ## Key Features
17//!
18//! - **Zero-Copy Operations**: No data cloning during access
19//! - **Atomic Updates**: Uses atomic pointer operations
20//! - **High Performance**: Potentially faster than lock-based approaches
21//! - **Memory Efficiency**: Single copy of data shared across threads
22//!
23//! ## ⚠️ Critical Limitations
24//!
25//! ### 1. **Non-Atomic Complex Operations**
26//! ```rust
27//! use thread_share::ArcThreadShare;
28//!
29//! let arc_share = ArcThreadShare::new(0);
30//!
31//! // ❌ This is NOT atomic and can cause race conditions
32//! arc_share.update(|x| *x = *x + 1);
33//!
34//! // ✅ Use the atomic increment method instead
35//! arc_share.increment();
36//! ```
37//!
38//! **Problem**: The `update` method with complex operations like `+=` is not atomic.
39//! Between reading the value, modifying it, and writing it back, other threads can interfere.
40//!
41//! ### 2. **High Contention Performance Issues**
42//! ```rust
43//! use thread_share::ArcThreadShare;
44//!
45//! let arc_share = ArcThreadShare::new(0);
46//!
47//! // ❌ High contention can cause significant performance degradation
48//! for _ in 0..10000 {
49//!     arc_share.increment(); // May lose many operations under high contention
50//! }
51//! ```
52//!
53//! **Problem**: Under high contention (many threads updating simultaneously), `AtomicPtr`
54//! operations can lose updates due to:
55//! - Box allocation/deallocation overhead
56//! - CAS (Compare-And-Swap) failures requiring retries
57//! - Memory pressure from frequent allocations
58//!
59//! **Expected Behavior**: In high-contention scenarios, you may see only 20-30% of
60//! expected operations complete successfully.
61//!
62//! ### 3. **Memory Allocation Overhead**
63//! ```rust
64//! use thread_share::ArcThreadShare;
65//!
66//! let arc_share = ArcThreadShare::new(0);
67//!
68//! // Each increment operation involves:
69//! // 1. Allocating new Box<T>
70//! // 2. Converting to raw pointer
71//! // 3. Atomic pointer swap
72//! // 4. Deallocating old Box<T>
73//! arc_share.increment();
74//! ```
75//!
76//! **Problem**: Every update operation creates a new `Box<T>` and deallocates the old one,
77//! which can be expensive for large data types.
78//!
79//! ## When to Use ArcThreadShare<T>
80//!
81//! ### ✅ Good Use Cases
82//! - **Low-contention scenarios** (few threads, infrequent updates)
83//! - **Performance-critical applications** where you understand the limitations
84//! - **Simple atomic operations** using built-in methods (`increment()`, `add()`)
85//! - **Read-heavy workloads** with occasional writes
86//!
87//! ### ❌ Avoid When
88//! - **High-frequency updates** (>1000 ops/second per thread)
89//! - **Critical data integrity** requirements
90//! - **Predictable performance** needs
91//! - **Large data structures** (due to allocation overhead)
92//! - **Multi-threaded counters** with strict accuracy requirements
93//!
94//! ## Example Usage
95//!
96//! ### Basic Operations
97//! ```rust
98//! use thread_share::ArcThreadShare;
99//!
100//! let counter = ArcThreadShare::new(0);
101//!
102//! // Use atomic methods for safety
103//! counter.increment();
104//! counter.add(5);
105//!
106//! assert_eq!(counter.get(), 6);
107//! ```
108//!
109//! ### From ThreadShare
110//! ```rust
111//! use thread_share::{share, ArcThreadShare};
112//!
113//! let data = share!(String::from("Hello"));
114//! let arc_data = data.as_arc();
115//! let arc_share = ArcThreadShare::from_arc(arc_data);
116//!
117//! // Safe atomic operations
118//! arc_share.update(|s| s.push_str(" World"));
119//! ```
120//!
121//! ## Performance Characteristics
122//!
123//! - **Low Contention**: Excellent performance, minimal overhead
124//! - **Medium Contention**: Good performance with some lost operations
125//! - **High Contention**: Poor performance, many lost operations
126//! - **Memory Usage**: Higher due to Box allocation/deallocation
127//!
128//! ## Best Practices
129//!
130//! 1. **Always use atomic methods** (`increment()`, `add()`) instead of complex `update()` operations
131//! 2. **Test with realistic contention levels** before production use
132//! 3. **Consider `ThreadShare<T>`** for critical applications
133//! 4. **Monitor performance** under expected load conditions
134//! 5. **Use for simple operations** only (increment, add, simple updates)
135//!
136//! ## Alternatives
137//!
138//! ### For High-Frequency Updates
139//! ```rust
140//! use thread_share::share;
141//!
142//! // Use ThreadShare with batching
143//! let share = share!(0);
144//! let clone = share.clone();
145//!
146//! clone.update(|x| {
147//!     for _ in 0..100 {
148//!         *x = *x + 1;
149//!     }
150//! });
151//! ```
152//!
153//! ### For Critical Data Integrity
154//! ```rust
155//! use thread_share::share;
156//!
157//! // Use ThreadShare for guaranteed safety
158//! let share = share!(vec![1, 2, 3]);
159//! let clone = share.clone();
160//!
161//! // All operations are guaranteed to succeed
162//! clone.update(|data| {
163//!     // Critical modifications
164//! });
165//! ```
166//!
167//! ### For Safe Zero-Copy
168//! ```rust
169//! use thread_share::{share, ArcThreadShareLocked};
170//!
171//! // Use ArcThreadShareLocked for safe zero-copy
172//! let share = share!(vec![1, 2, 3]);
173//! let arc_data = share.as_arc_locked();
174//! let locked_share = ArcThreadShareLocked::from_arc(arc_data);
175//!
176//! // Safe zero-copy with guaranteed thread safety
177//! locked_share.update(|data| {
178//!     // Safe modifications
179//! });
180//! ```
181
182use std::sync::atomic::{AtomicPtr, Ordering};
183use std::sync::Arc;
184
185#[cfg(feature = "serialize")]
186use serde::{de::DeserializeOwned, Serialize};
187
188/// Helper structure for working with Arc<AtomicPtr<T>> directly (without locks!)
189///
190/// **⚠️ WARNING: This structure has significant limitations and should be used with caution!**
191///
192/// ## Overview
193///
194/// `ArcThreadShare<T>` provides zero-copy data sharing between threads using atomic
195/// pointer operations. While this can offer high performance, it comes with important
196/// trade-offs that developers must understand.
197///
198/// ## Key Features
199///
200/// - **Zero-Copy Operations**: No data cloning during access
201/// - **Atomic Updates**: Uses atomic pointer operations
202/// - **High Performance**: Potentially faster than lock-based approaches
203/// - **Memory Efficiency**: Single copy of data shared across threads
204///
205///
206/// ### 2. **High Contention Performance Issues**
207/// Under high contention, many operations may be lost due to:
208/// - Box allocation/deallocation overhead
209/// - CAS failures requiring retries
210/// - Memory pressure from frequent allocations
211///
212/// ### 3. **Memory Allocation Overhead**
213/// Every update operation involves Box allocation and deallocation.
214///
215/// ## When to Use
216///
217/// - **Low-contention scenarios** (few threads, infrequent updates)
218/// - **Performance-critical applications** where you understand the limitations
219/// - **Simple atomic operations** using built-in methods
220/// - **Read-heavy workloads** with occasional writes
221///
222/// ## When to Avoid
223///
224/// - **High-frequency updates** (>1000 ops/second per thread)
225/// - **Critical data integrity** requirements
226/// - **Predictable performance** needs
227/// - **Large data structures**
228///
229/// ## Example
230///
231/// ```rust
232/// use thread_share::ArcThreadShare;
233///
234/// let counter = ArcThreadShare::new(0);
235///
236/// // Use atomic methods for safety
237/// counter.increment();
238/// counter.add(5);
239///
240/// assert_eq!(counter.get(), 6);
241/// ```
242pub struct ArcThreadShare<T> {
243    pub data: Arc<AtomicPtr<T>>,
244}
245
246// Automatically implement Send and Sync for ArcThreadShare
247unsafe impl<T> Send for ArcThreadShare<T> {}
248unsafe impl<T> Sync for ArcThreadShare<T> {}
249
250impl<T> Clone for ArcThreadShare<T> {
251    fn clone(&self) -> Self {
252        Self {
253            data: Arc::clone(&self.data),
254        }
255    }
256}
257
258impl<T> ArcThreadShare<T> {
259    /// Creates from Arc<AtomicPtr<T>>
260    ///
261    /// This method creates an `ArcThreadShare<T>` from an existing `Arc<AtomicPtr<T>>`.
262    /// Useful when you already have atomic pointer data from other sources.
263    ///
264    /// ## Arguments
265    ///
266    /// * `arc` - An `Arc<AtomicPtr<T>>` containing the data to share
267    ///
268    /// ## Returns
269    ///
270    /// A new `ArcThreadShare<T>` instance sharing the same data.
271    ///
272    /// ## Example
273    ///
274    /// ```rust
275    /// use thread_share::{share, ArcThreadShare};
276    ///
277    /// let data = share!(String::from("Hello"));
278    /// let arc_data = data.as_arc();
279    /// let arc_share = ArcThreadShare::from_arc(arc_data);
280    ///
281    /// // Now you can use atomic operations
282    /// arc_share.update(|s| s.push_str(" World"));
283    /// ```
284    pub fn from_arc(arc: Arc<AtomicPtr<T>>) -> Self {
285        Self { data: arc }
286    }
287
288    /// Creates a new ArcThreadShare with data
289    ///
290    /// This method creates a new `ArcThreadShare<T>` instance with the provided data.
291    /// The data is boxed and converted to an atomic pointer for thread-safe sharing.
292    ///
293    /// ## Arguments
294    ///
295    /// * `data` - The initial data to share between threads
296    ///
297    /// ## Requirements
298    ///
299    /// The type `T` must implement `Clone` trait.
300    ///
301    /// ## Returns
302    ///
303    /// A new `ArcThreadShare<T>` instance containing the data.
304    ///
305    /// ## Example
306    ///
307    /// ```rust
308    /// use thread_share::ArcThreadShare;
309    ///
310    /// let counter = ArcThreadShare::new(0);
311    /// let message = ArcThreadShare::new(String::from("Hello"));
312    /// let data = ArcThreadShare::new(vec![1, 2, 3]);
313    /// ```
314    pub fn new(data: T) -> Self
315    where
316        T: Clone,
317    {
318        let boxed = Box::new(data);
319        let ptr = Box::into_raw(boxed);
320        let atomic = Arc::new(AtomicPtr::new(ptr));
321        Self { data: atomic }
322    }
323
324    /// Gets a copy of data
325    ///
326    /// This method retrieves a copy of the current data. The operation is safe
327    /// but involves cloning the data.
328    ///
329    /// ## Requirements
330    ///
331    /// The type `T` must implement `Clone` trait.
332    ///
333    /// ## Returns
334    ///
335    /// A copy of the current data.
336    ///
337    /// ## Example
338    ///
339    /// ```rust
340    /// use thread_share::ArcThreadShare;
341    ///
342    /// let counter = ArcThreadShare::new(42);
343    /// let value = counter.get();
344    /// assert_eq!(value, 42);
345    /// ```
346    pub fn get(&self) -> T
347    where
348        T: Clone,
349    {
350        let ptr = self.data.load(Ordering::Acquire);
351        unsafe { (*ptr).clone() }
352    }
353
354    /// Sets data atomically
355    ///
356    /// This method atomically replaces the current data with new data.
357    /// The old data is automatically deallocated.
358    ///
359    /// ## Arguments
360    ///
361    /// * `new_data` - The new data to set
362    ///
363    /// ## Example
364    ///
365    /// ```rust
366    /// use thread_share::ArcThreadShare;
367    ///
368    /// let counter = ArcThreadShare::new(0);
369    /// counter.set(100);
370    /// assert_eq!(counter.get(), 100);
371    /// ```
372    pub fn set(&self, new_data: T) {
373        let new_boxed = Box::new(new_data);
374        let new_ptr = Box::into_raw(new_boxed);
375
376        let old_ptr = self.data.swap(new_ptr, Ordering::AcqRel);
377
378        // Free old data
379        if !old_ptr.is_null() {
380            unsafe {
381                drop(Box::from_raw(old_ptr));
382            }
383        }
384    }
385
386    /// Updates data (⚠️ NOT atomic for complex operations!)
387    ///
388    /// **⚠️ WARNING: This method is NOT atomic for complex operations!**
389    ///
390    /// For simple operations like `+= 1`, use the atomic methods `increment()` or `add()`
391    /// instead. This method can cause race conditions under high contention.
392    ///
393    /// ## Arguments
394    ///
395    /// * `f` - Closure that receives a mutable reference to the data
396    ///
397    /// ## Example
398    ///
399    /// ```rust
400    /// use thread_share::ArcThreadShare;
401    ///
402    /// let counter = ArcThreadShare::new(0);
403    ///
404    /// // ❌ NOT atomic - can cause race conditions
405    /// counter.update(|x| *x += 1);
406    ///
407    /// // ✅ Use atomic methods instead
408    /// counter.increment();
409    /// ```
410    pub fn update<F>(&self, f: F)
411    where
412        F: FnOnce(&mut T),
413    {
414        let ptr = self.data.load(Ordering::Acquire);
415        if !ptr.is_null() {
416            unsafe {
417                f(&mut *ptr);
418            }
419        }
420    }
421
422    /// Atomically increments numeric values (for types that support it)
423    ///
424    /// This method provides atomic increment operations for numeric types.
425    /// It uses a compare-exchange loop to ensure atomicity.
426    ///
427    /// ## Requirements
428    ///
429    /// The type `T` must implement:
430    /// - `Copy` - for efficient copying
431    /// - `std::ops::Add<Output = T>` - for addition operations
432    /// - `std::ops::AddAssign` - for compound assignment
433    /// - `From<u8>` - for creating the value 1
434    /// - `'static` - for lifetime requirements
435    ///
436    /// ## Example
437    ///
438    /// ```rust
439    /// use thread_share::ArcThreadShare;
440    ///
441    /// let counter = ArcThreadShare::new(0);
442    ///
443    /// // Atomic increment
444    /// counter.increment();
445    /// assert_eq!(counter.get(), 1);
446    ///
447    /// counter.increment();
448    /// assert_eq!(counter.get(), 2);
449    /// ```
450    pub fn increment(&self)
451    where
452        T: Copy + std::ops::Add<Output = T> + std::ops::AddAssign + From<u8> + 'static,
453    {
454        loop {
455            let ptr = self.data.load(Ordering::Acquire);
456            if ptr.is_null() {
457                break;
458            }
459
460            let current_value = unsafe { *ptr };
461            let new_value = current_value + T::from(1u8);
462
463            // Try to atomically update the pointer with new data
464            let new_boxed = Box::new(new_value);
465            let new_ptr = Box::into_raw(new_boxed);
466
467            if let Ok(_) =
468                self.data
469                    .compare_exchange(ptr, new_ptr, Ordering::AcqRel, Ordering::Acquire)
470            {
471                // Successfully updated, free old data
472                unsafe {
473                    drop(Box::from_raw(ptr));
474                }
475                break;
476            } else {
477                // Failed to update, free new data and retry
478                unsafe {
479                    drop(Box::from_raw(new_ptr));
480                }
481            }
482        }
483    }
484
485    /// Atomically adds a value (for types that support it)
486    pub fn add(&self, value: T)
487    where
488        T: Copy + std::ops::Add<Output = T> + std::ops::AddAssign + 'static,
489    {
490        loop {
491            let ptr = self.data.load(Ordering::Acquire);
492            if ptr.is_null() {
493                break;
494            }
495
496            let current_value = unsafe { *ptr };
497            let new_value = current_value + value;
498
499            // Try to atomically update the pointer with new data
500            let new_boxed = Box::new(new_value);
501            let new_ptr = Box::into_raw(new_boxed);
502
503            if let Ok(_) =
504                self.data
505                    .compare_exchange(ptr, new_ptr, Ordering::AcqRel, Ordering::Acquire)
506            {
507                // Successfully updated, free old data
508                unsafe {
509                    drop(Box::from_raw(ptr));
510                }
511                break;
512            } else {
513                // Failed to update, free new data and retry
514                unsafe {
515                    drop(Box::from_raw(new_ptr));
516                }
517            }
518        }
519    }
520
521    /// Reads data
522    pub fn read<F, R>(&self, f: F) -> R
523    where
524        F: FnOnce(&T) -> R,
525    {
526        let ptr = self.data.load(Ordering::Acquire);
527        if !ptr.is_null() {
528            unsafe { f(&*ptr) }
529        } else {
530            panic!("Attempted to read from null pointer");
531        }
532    }
533
534    /// Writes data
535    pub fn write<F, R>(&self, f: F) -> R
536    where
537        F: FnOnce(&mut T) -> R,
538    {
539        let ptr = self.data.load(Ordering::Acquire);
540        if !ptr.is_null() {
541            unsafe { f(&mut *ptr) }
542        } else {
543            panic!("Attempted to write to null pointer");
544        }
545    }
546
547    #[cfg(feature = "serialize")]
548    pub fn to_json(&self) -> Result<String, serde_json::Error>
549    where
550        T: Serialize + Clone,
551    {
552        serde_json::to_string(&self.get())
553    }
554
555    #[cfg(feature = "serialize")]
556    pub fn from_json<D>(&self, json: &str) -> Result<D, serde_json::Error>
557    where
558        D: DeserializeOwned,
559    {
560        serde_json::from_str(json)
561    }
562}
563
564/// Helper structure for working with Arc<Mutex<T>> directly
565pub struct ArcSimpleShare<T> {
566    pub data: Arc<std::sync::Mutex<T>>,
567}
568
569// Automatically implement Send and Sync for ArcSimpleShare
570unsafe impl<T> Send for ArcSimpleShare<T> {}
571unsafe impl<T> Sync for ArcSimpleShare<T> {}
572
573impl<T> ArcSimpleShare<T> {
574    /// Creates from Arc<Mutex<T>>
575    pub fn from_arc(arc: Arc<std::sync::Mutex<T>>) -> Self {
576        Self { data: arc }
577    }
578
579    /// Gets data
580    pub fn get(&self) -> T
581    where
582        T: Clone,
583    {
584        self.data.lock().unwrap().clone()
585    }
586
587    /// Sets data
588    pub fn set(&self, new_data: T) {
589        let mut data = self.data.lock().unwrap();
590        *data = new_data;
591    }
592
593    /// Updates data
594    pub fn update<F>(&self, f: F)
595    where
596        F: FnOnce(&mut T),
597    {
598        let mut data = self.data.lock().unwrap();
599        f(&mut data);
600    }
601}