thread_share/atomic.rs
1//! # Atomic Module - ArcThreadShare<T>
2//!
3//! This module provides `ArcThreadShare<T>`, a high-performance structure for
4//! zero-copy data sharing between threads using atomic operations.
5//!
6//! ## ⚠️ Important Warning
7//!
8//! **`ArcThreadShare<T>` has significant limitations and should be used with caution!**
9//!
10//! ## Overview
11//!
12//! `ArcThreadShare<T>` uses `Arc<AtomicPtr<T>>` internally to provide zero-copy
13//! data sharing without locks. While this can offer high performance, it comes
14//! with important trade-offs.
15//!
16//! ## Key Features
17//!
18//! - **Zero-Copy Operations**: No data cloning during access
19//! - **Atomic Updates**: Uses atomic pointer operations
20//! - **High Performance**: Potentially faster than lock-based approaches
21//! - **Memory Efficiency**: Single copy of data shared across threads
22//!
23//! ## ⚠️ Critical Limitations
24//!
25//! ### 1. **Non-Atomic Complex Operations**
26//! ```rust
27//! use thread_share::ArcThreadShare;
28//!
29//! let arc_share = ArcThreadShare::new(0);
30//!
31//! // ❌ This is NOT atomic and can cause race conditions
32//! arc_share.update(|x| *x = *x + 1);
33//!
34//! // ✅ Use the atomic increment method instead
35//! arc_share.increment();
36//! ```
37//!
38//! **Problem**: The `update` method with complex operations like `+=` is not atomic.
39//! Between reading the value, modifying it, and writing it back, other threads can interfere.
40//!
41//! ### 2. **High Contention Performance Issues**
42//! ```rust
43//! use thread_share::ArcThreadShare;
44//!
45//! let arc_share = ArcThreadShare::new(0);
46//!
47//! // ❌ High contention can cause significant performance degradation
48//! for _ in 0..10000 {
49//! arc_share.increment(); // May lose many operations under high contention
50//! }
51//! ```
52//!
53//! **Problem**: Under high contention (many threads updating simultaneously), `AtomicPtr`
54//! operations can lose updates due to:
55//! - Box allocation/deallocation overhead
56//! - CAS (Compare-And-Swap) failures requiring retries
57//! - Memory pressure from frequent allocations
58//!
59//! **Expected Behavior**: In high-contention scenarios, you may see only 20-30% of
60//! expected operations complete successfully.
61//!
62//! ### 3. **Memory Allocation Overhead**
63//! ```rust
64//! use thread_share::ArcThreadShare;
65//!
66//! let arc_share = ArcThreadShare::new(0);
67//!
68//! // Each increment operation involves:
69//! // 1. Allocating new Box<T>
70//! // 2. Converting to raw pointer
71//! // 3. Atomic pointer swap
72//! // 4. Deallocating old Box<T>
73//! arc_share.increment();
74//! ```
75//!
76//! **Problem**: Every update operation creates a new `Box<T>` and deallocates the old one,
77//! which can be expensive for large data types.
78//!
79//! ## When to Use ArcThreadShare<T>
80//!
81//! ### ✅ Good Use Cases
82//! - **Low-contention scenarios** (few threads, infrequent updates)
83//! - **Performance-critical applications** where you understand the limitations
84//! - **Simple atomic operations** using built-in methods (`increment()`, `add()`)
85//! - **Read-heavy workloads** with occasional writes
86//!
87//! ### ❌ Avoid When
88//! - **High-frequency updates** (>1000 ops/second per thread)
89//! - **Critical data integrity** requirements
90//! - **Predictable performance** needs
91//! - **Large data structures** (due to allocation overhead)
92//! - **Multi-threaded counters** with strict accuracy requirements
93//!
94//! ## Example Usage
95//!
96//! ### Basic Operations
97//! ```rust
98//! use thread_share::ArcThreadShare;
99//!
100//! let counter = ArcThreadShare::new(0);
101//!
102//! // Use atomic methods for safety
103//! counter.increment();
104//! counter.add(5);
105//!
106//! assert_eq!(counter.get(), 6);
107//! ```
108//!
109//! ### From ThreadShare
110//! ```rust
111//! use thread_share::{share, ArcThreadShare};
112//!
113//! let data = share!(String::from("Hello"));
114//! let arc_data = data.as_arc();
115//! let arc_share = ArcThreadShare::from_arc(arc_data);
116//!
117//! // Safe atomic operations
118//! arc_share.update(|s| s.push_str(" World"));
119//! ```
120//!
121//! ## Performance Characteristics
122//!
123//! - **Low Contention**: Excellent performance, minimal overhead
124//! - **Medium Contention**: Good performance with some lost operations
125//! - **High Contention**: Poor performance, many lost operations
126//! - **Memory Usage**: Higher due to Box allocation/deallocation
127//!
128//! ## Best Practices
129//!
130//! 1. **Always use atomic methods** (`increment()`, `add()`) instead of complex `update()` operations
131//! 2. **Test with realistic contention levels** before production use
132//! 3. **Consider `ThreadShare<T>`** for critical applications
133//! 4. **Monitor performance** under expected load conditions
134//! 5. **Use for simple operations** only (increment, add, simple updates)
135//!
136//! ## Alternatives
137//!
138//! ### For High-Frequency Updates
139//! ```rust
140//! use thread_share::share;
141//!
142//! // Use ThreadShare with batching
143//! let share = share!(0);
144//! let clone = share.clone();
145//!
146//! clone.update(|x| {
147//! for _ in 0..100 {
148//! *x = *x + 1;
149//! }
150//! });
151//! ```
152//!
153//! ### For Critical Data Integrity
154//! ```rust
155//! use thread_share::share;
156//!
157//! // Use ThreadShare for guaranteed safety
158//! let share = share!(vec![1, 2, 3]);
159//! let clone = share.clone();
160//!
161//! // All operations are guaranteed to succeed
162//! clone.update(|data| {
163//! // Critical modifications
164//! });
165//! ```
166//!
167//! ### For Safe Zero-Copy
168//! ```rust
169//! use thread_share::{share, ArcThreadShareLocked};
170//!
171//! // Use ArcThreadShareLocked for safe zero-copy
172//! let share = share!(vec![1, 2, 3]);
173//! let arc_data = share.as_arc_locked();
174//! let locked_share = ArcThreadShareLocked::from_arc(arc_data);
175//!
176//! // Safe zero-copy with guaranteed thread safety
177//! locked_share.update(|data| {
178//! // Safe modifications
179//! });
180//! ```
181
182use std::sync::atomic::{AtomicPtr, Ordering};
183use std::sync::Arc;
184
185#[cfg(feature = "serialize")]
186use serde::{de::DeserializeOwned, Serialize};
187
188/// Helper structure for working with Arc<AtomicPtr<T>> directly (without locks!)
189///
190/// **⚠️ WARNING: This structure has significant limitations and should be used with caution!**
191///
192/// ## Overview
193///
194/// `ArcThreadShare<T>` provides zero-copy data sharing between threads using atomic
195/// pointer operations. While this can offer high performance, it comes with important
196/// trade-offs that developers must understand.
197///
198/// ## Key Features
199///
200/// - **Zero-Copy Operations**: No data cloning during access
201/// - **Atomic Updates**: Uses atomic pointer operations
202/// - **High Performance**: Potentially faster than lock-based approaches
203/// - **Memory Efficiency**: Single copy of data shared across threads
204///
205///
206/// ### 2. **High Contention Performance Issues**
207/// Under high contention, many operations may be lost due to:
208/// - Box allocation/deallocation overhead
209/// - CAS failures requiring retries
210/// - Memory pressure from frequent allocations
211///
212/// ### 3. **Memory Allocation Overhead**
213/// Every update operation involves Box allocation and deallocation.
214///
215/// ## When to Use
216///
217/// - **Low-contention scenarios** (few threads, infrequent updates)
218/// - **Performance-critical applications** where you understand the limitations
219/// - **Simple atomic operations** using built-in methods
220/// - **Read-heavy workloads** with occasional writes
221///
222/// ## When to Avoid
223///
224/// - **High-frequency updates** (>1000 ops/second per thread)
225/// - **Critical data integrity** requirements
226/// - **Predictable performance** needs
227/// - **Large data structures**
228///
229/// ## Example
230///
231/// ```rust
232/// use thread_share::ArcThreadShare;
233///
234/// let counter = ArcThreadShare::new(0);
235///
236/// // Use atomic methods for safety
237/// counter.increment();
238/// counter.add(5);
239///
240/// assert_eq!(counter.get(), 6);
241/// ```
242pub struct ArcThreadShare<T> {
243 pub data: Arc<AtomicPtr<T>>,
244}
245
246// Automatically implement Send and Sync for ArcThreadShare
247unsafe impl<T> Send for ArcThreadShare<T> {}
248unsafe impl<T> Sync for ArcThreadShare<T> {}
249
250impl<T> Clone for ArcThreadShare<T> {
251 fn clone(&self) -> Self {
252 Self {
253 data: Arc::clone(&self.data),
254 }
255 }
256}
257
258impl<T> ArcThreadShare<T> {
259 /// Creates from Arc<AtomicPtr<T>>
260 ///
261 /// This method creates an `ArcThreadShare<T>` from an existing `Arc<AtomicPtr<T>>`.
262 /// Useful when you already have atomic pointer data from other sources.
263 ///
264 /// ## Arguments
265 ///
266 /// * `arc` - An `Arc<AtomicPtr<T>>` containing the data to share
267 ///
268 /// ## Returns
269 ///
270 /// A new `ArcThreadShare<T>` instance sharing the same data.
271 ///
272 /// ## Example
273 ///
274 /// ```rust
275 /// use thread_share::{share, ArcThreadShare};
276 ///
277 /// let data = share!(String::from("Hello"));
278 /// let arc_data = data.as_arc();
279 /// let arc_share = ArcThreadShare::from_arc(arc_data);
280 ///
281 /// // Now you can use atomic operations
282 /// arc_share.update(|s| s.push_str(" World"));
283 /// ```
284 pub fn from_arc(arc: Arc<AtomicPtr<T>>) -> Self {
285 Self { data: arc }
286 }
287
288 /// Creates a new ArcThreadShare with data
289 ///
290 /// This method creates a new `ArcThreadShare<T>` instance with the provided data.
291 /// The data is boxed and converted to an atomic pointer for thread-safe sharing.
292 ///
293 /// ## Arguments
294 ///
295 /// * `data` - The initial data to share between threads
296 ///
297 /// ## Requirements
298 ///
299 /// The type `T` must implement `Clone` trait.
300 ///
301 /// ## Returns
302 ///
303 /// A new `ArcThreadShare<T>` instance containing the data.
304 ///
305 /// ## Example
306 ///
307 /// ```rust
308 /// use thread_share::ArcThreadShare;
309 ///
310 /// let counter = ArcThreadShare::new(0);
311 /// let message = ArcThreadShare::new(String::from("Hello"));
312 /// let data = ArcThreadShare::new(vec![1, 2, 3]);
313 /// ```
314 pub fn new(data: T) -> Self
315 where
316 T: Clone,
317 {
318 let boxed = Box::new(data);
319 let ptr = Box::into_raw(boxed);
320 let atomic = Arc::new(AtomicPtr::new(ptr));
321 Self { data: atomic }
322 }
323
324 /// Gets a copy of data
325 ///
326 /// This method retrieves a copy of the current data. The operation is safe
327 /// but involves cloning the data.
328 ///
329 /// ## Requirements
330 ///
331 /// The type `T` must implement `Clone` trait.
332 ///
333 /// ## Returns
334 ///
335 /// A copy of the current data.
336 ///
337 /// ## Example
338 ///
339 /// ```rust
340 /// use thread_share::ArcThreadShare;
341 ///
342 /// let counter = ArcThreadShare::new(42);
343 /// let value = counter.get();
344 /// assert_eq!(value, 42);
345 /// ```
346 pub fn get(&self) -> T
347 where
348 T: Clone,
349 {
350 let ptr = self.data.load(Ordering::Acquire);
351 unsafe { (*ptr).clone() }
352 }
353
354 /// Sets data atomically
355 ///
356 /// This method atomically replaces the current data with new data.
357 /// The old data is automatically deallocated.
358 ///
359 /// ## Arguments
360 ///
361 /// * `new_data` - The new data to set
362 ///
363 /// ## Example
364 ///
365 /// ```rust
366 /// use thread_share::ArcThreadShare;
367 ///
368 /// let counter = ArcThreadShare::new(0);
369 /// counter.set(100);
370 /// assert_eq!(counter.get(), 100);
371 /// ```
372 pub fn set(&self, new_data: T) {
373 let new_boxed = Box::new(new_data);
374 let new_ptr = Box::into_raw(new_boxed);
375
376 let old_ptr = self.data.swap(new_ptr, Ordering::AcqRel);
377
378 // Free old data
379 if !old_ptr.is_null() {
380 unsafe {
381 drop(Box::from_raw(old_ptr));
382 }
383 }
384 }
385
386 /// Updates data (⚠️ NOT atomic for complex operations!)
387 ///
388 /// **⚠️ WARNING: This method is NOT atomic for complex operations!**
389 ///
390 /// For simple operations like `+= 1`, use the atomic methods `increment()` or `add()`
391 /// instead. This method can cause race conditions under high contention.
392 ///
393 /// ## Arguments
394 ///
395 /// * `f` - Closure that receives a mutable reference to the data
396 ///
397 /// ## Example
398 ///
399 /// ```rust
400 /// use thread_share::ArcThreadShare;
401 ///
402 /// let counter = ArcThreadShare::new(0);
403 ///
404 /// // ❌ NOT atomic - can cause race conditions
405 /// counter.update(|x| *x += 1);
406 ///
407 /// // ✅ Use atomic methods instead
408 /// counter.increment();
409 /// ```
410 pub fn update<F>(&self, f: F)
411 where
412 F: FnOnce(&mut T),
413 {
414 let ptr = self.data.load(Ordering::Acquire);
415 if !ptr.is_null() {
416 unsafe {
417 f(&mut *ptr);
418 }
419 }
420 }
421
422 /// Atomically increments numeric values (for types that support it)
423 ///
424 /// This method provides atomic increment operations for numeric types.
425 /// It uses a compare-exchange loop to ensure atomicity.
426 ///
427 /// ## Requirements
428 ///
429 /// The type `T` must implement:
430 /// - `Copy` - for efficient copying
431 /// - `std::ops::Add<Output = T>` - for addition operations
432 /// - `std::ops::AddAssign` - for compound assignment
433 /// - `From<u8>` - for creating the value 1
434 /// - `'static` - for lifetime requirements
435 ///
436 /// ## Example
437 ///
438 /// ```rust
439 /// use thread_share::ArcThreadShare;
440 ///
441 /// let counter = ArcThreadShare::new(0);
442 ///
443 /// // Atomic increment
444 /// counter.increment();
445 /// assert_eq!(counter.get(), 1);
446 ///
447 /// counter.increment();
448 /// assert_eq!(counter.get(), 2);
449 /// ```
450 pub fn increment(&self)
451 where
452 T: Copy + std::ops::Add<Output = T> + std::ops::AddAssign + From<u8> + 'static,
453 {
454 loop {
455 let ptr = self.data.load(Ordering::Acquire);
456 if ptr.is_null() {
457 break;
458 }
459
460 let current_value = unsafe { *ptr };
461 let new_value = current_value + T::from(1u8);
462
463 // Try to atomically update the pointer with new data
464 let new_boxed = Box::new(new_value);
465 let new_ptr = Box::into_raw(new_boxed);
466
467 if let Ok(_) =
468 self.data
469 .compare_exchange(ptr, new_ptr, Ordering::AcqRel, Ordering::Acquire)
470 {
471 // Successfully updated, free old data
472 unsafe {
473 drop(Box::from_raw(ptr));
474 }
475 break;
476 } else {
477 // Failed to update, free new data and retry
478 unsafe {
479 drop(Box::from_raw(new_ptr));
480 }
481 }
482 }
483 }
484
485 /// Atomically adds a value (for types that support it)
486 pub fn add(&self, value: T)
487 where
488 T: Copy + std::ops::Add<Output = T> + std::ops::AddAssign + 'static,
489 {
490 loop {
491 let ptr = self.data.load(Ordering::Acquire);
492 if ptr.is_null() {
493 break;
494 }
495
496 let current_value = unsafe { *ptr };
497 let new_value = current_value + value;
498
499 // Try to atomically update the pointer with new data
500 let new_boxed = Box::new(new_value);
501 let new_ptr = Box::into_raw(new_boxed);
502
503 if let Ok(_) =
504 self.data
505 .compare_exchange(ptr, new_ptr, Ordering::AcqRel, Ordering::Acquire)
506 {
507 // Successfully updated, free old data
508 unsafe {
509 drop(Box::from_raw(ptr));
510 }
511 break;
512 } else {
513 // Failed to update, free new data and retry
514 unsafe {
515 drop(Box::from_raw(new_ptr));
516 }
517 }
518 }
519 }
520
521 /// Reads data
522 pub fn read<F, R>(&self, f: F) -> R
523 where
524 F: FnOnce(&T) -> R,
525 {
526 let ptr = self.data.load(Ordering::Acquire);
527 if !ptr.is_null() {
528 unsafe { f(&*ptr) }
529 } else {
530 panic!("Attempted to read from null pointer");
531 }
532 }
533
534 /// Writes data
535 pub fn write<F, R>(&self, f: F) -> R
536 where
537 F: FnOnce(&mut T) -> R,
538 {
539 let ptr = self.data.load(Ordering::Acquire);
540 if !ptr.is_null() {
541 unsafe { f(&mut *ptr) }
542 } else {
543 panic!("Attempted to write to null pointer");
544 }
545 }
546
547 #[cfg(feature = "serialize")]
548 pub fn to_json(&self) -> Result<String, serde_json::Error>
549 where
550 T: Serialize + Clone,
551 {
552 serde_json::to_string(&self.get())
553 }
554
555 #[cfg(feature = "serialize")]
556 pub fn from_json<D>(&self, json: &str) -> Result<D, serde_json::Error>
557 where
558 D: DeserializeOwned,
559 {
560 serde_json::from_str(json)
561 }
562}
563
564/// Helper structure for working with Arc<Mutex<T>> directly
565pub struct ArcSimpleShare<T> {
566 pub data: Arc<std::sync::Mutex<T>>,
567}
568
569// Automatically implement Send and Sync for ArcSimpleShare
570unsafe impl<T> Send for ArcSimpleShare<T> {}
571unsafe impl<T> Sync for ArcSimpleShare<T> {}
572
573impl<T> ArcSimpleShare<T> {
574 /// Creates from Arc<Mutex<T>>
575 pub fn from_arc(arc: Arc<std::sync::Mutex<T>>) -> Self {
576 Self { data: arc }
577 }
578
579 /// Gets data
580 pub fn get(&self) -> T
581 where
582 T: Clone,
583 {
584 self.data.lock().unwrap().clone()
585 }
586
587 /// Sets data
588 pub fn set(&self, new_data: T) {
589 let mut data = self.data.lock().unwrap();
590 *data = new_data;
591 }
592
593 /// Updates data
594 pub fn update<F>(&self, f: F)
595 where
596 F: FnOnce(&mut T),
597 {
598 let mut data = self.data.lock().unwrap();
599 f(&mut data);
600 }
601}