rcu_128/
lib.rs

1#![cfg(target_has_atomic = "128")]
2#![feature(integer_atomics)]
3#![no_std]
4extern crate alloc;
5use alloc::boxed::Box;
6use parking_lot::RwLock;
7
8use core::{
9    hint,
10    marker::PhantomData,
11    ops::Deref,
12    ptr::NonNull,
13    sync::atomic::{AtomicU128, Ordering},
14};
15
16/// A guard that provides read access to a value in an `RcuCell`.
17///
18/// When this guard is dropped, it will signal that the read operation
19/// is complete, allowing the `RcuCell` to manage its internal state
20/// accordingly.
21pub struct RcuGuard<'a, T> {
22    ptr: NonNull<T>,
23    cell: &'a RcuCell<T>,
24}
25
26impl<T> Deref for RcuGuard<'_, T> {
27    type Target = T;
28    fn deref(&self) -> &T {
29        unsafe { self.ptr.as_ref() }
30    }
31}
32
33impl<T> Drop for RcuGuard<'_, T> {
34    fn drop(&mut self) {
35        // Try to decrement ptr_counter_latest first
36        loop {
37            let ptr_counter = self.cell.ptr_counter_latest.load(Ordering::Acquire);
38            if (ptr_counter >> 64) as usize == self.ptr.as_ptr() as usize {
39                if self
40                    .cell
41                    .ptr_counter_latest
42                    .compare_exchange_weak(
43                        ptr_counter,
44                        ptr_counter - 1,
45                        Ordering::AcqRel,
46                        Ordering::Relaxed,
47                    )
48                    .is_ok()
49                {
50                    return;
51                }
52            } else {
53                // ptr_counter_latest has been updated, so we can't decrement it
54                break;
55            }
56            hint::spin_loop();
57        }
58        // Decrement ptr_counter_to_clear
59        loop {
60            let ptr_counter = self.cell.ptr_counter_to_clear.load(Ordering::Acquire);
61            if (ptr_counter >> 64) as usize == self.ptr.as_ptr() as usize
62                && self
63                    .cell
64                    .ptr_counter_to_clear
65                    .compare_exchange_weak(
66                        ptr_counter,
67                        ptr_counter - 1,
68                        Ordering::AcqRel,
69                        Ordering::Relaxed,
70                    )
71                    .is_ok()
72            {
73                return;
74            }
75            hint::spin_loop();
76        }
77    }
78}
79
80/// A concurrent data structure that allows for safe, read-copy-update (RCU)
81/// style access to its value.
82pub struct RcuCell<T> {
83    ptr_counter_latest: AtomicU128,
84    ptr_counter_to_clear: AtomicU128,
85    data: PhantomData<T>,
86    update_token: RwLock<()>,
87}
88
89impl<T> RcuCell<T> {
90    /// Creates a new `RcuCell` with the given initial value.
91    ///
92    /// This function initializes a new `RcuCell` instance, setting its
93    /// initial value to the provided `value`.
94    ///
95    /// # Arguments
96    ///
97    /// * `value` - The initial value to store in the `RcuCell`.
98    ///
99    /// # Returns
100    ///
101    /// A new instance of `RcuCell` containing the provided initial value.
102    ///
103    /// # Example
104    ///
105    /// ```
106    /// let rcu_cell = rcu_128::RcuCell::new(42);
107    /// ```
108    pub fn new(value: T) -> Self {
109        Self {
110            ptr_counter_latest: AtomicU128::new((Box::into_raw(Box::new(value)) as u128) << 64),
111            ptr_counter_to_clear: AtomicU128::new(0),
112            data: PhantomData,
113            update_token: RwLock::new(()),
114        }
115    }
116
117    /// Provides read access to the value stored in the `RcuCell`.
118    ///
119    /// This function returns an `RcuGuard`, which allows for safe,
120    /// concurrent read access to the `RcuCell`'s value.
121    ///
122    /// Once all `RcuGuard` instances referencing a particular value are
123    /// dropped, the value can be safely released during an update or write.
124    ///
125    /// # Example
126    ///
127    /// ```
128    /// let rcu_cell = rcu_128::RcuCell::new(42);
129    /// {
130    ///     let guard = rcu_cell.read();
131    ///     assert_eq!(*guard, 42);
132    /// }
133    /// ```
134    pub fn read(&self) -> RcuGuard<'_, T> {
135        let ptr = unsafe {
136            NonNull::new_unchecked(
137                (self.ptr_counter_latest.fetch_add(1, Ordering::AcqRel) >> 64) as usize as *mut T,
138            )
139        };
140        RcuGuard { cell: self, ptr }
141    }
142
143    /// Writes a new value into the `RcuCell`.
144    ///
145    /// This function immediately writes the new value into the `RcuCell`.
146    /// It will block until all current readers have finished reading
147    /// the old value.
148    ///
149    /// Once all readers have completed their read operations, the
150    /// old value will be safely released.
151    ///
152    /// # Arguments
153    ///
154    /// * `value` - The new value to store in the `RcuCell`.
155    ///
156    /// # Example
157    ///
158    /// ```
159    /// let rcu_cell = rcu_128::RcuCell::new(42);
160    /// rcu_cell.write(100);
161    /// {
162    ///     let guard = rcu_cell.read();
163    ///     assert_eq!(*guard, 100);
164    /// }
165    /// ```
166    pub fn write(&self, value: T) {
167        let new_ptr_counter = (Box::into_raw(Box::new(value)) as u128) << 64;
168        let token_shared = self.update_token.read();
169        let old_ptr_counter = self
170            .ptr_counter_latest
171            .swap(new_ptr_counter, Ordering::AcqRel);
172        drop(token_shared);
173        self.clear(old_ptr_counter);
174    }
175
176    /// Updates the value stored in the `RcuCell` using a provided function.
177    ///
178    /// This function applies the given closure `f` to the current value
179    /// stored in the `RcuCell`, replacing it with the new value returned
180    /// by the closure. It will block until all current readers have finished
181    /// reading the old value.
182    ///
183    /// Once all readers have completed their read operations, the old value
184    /// will be safely released.
185    ///
186    /// # Arguments
187    ///
188    /// * `f` - A closure that takes a reference to the current value and returns
189    ///         a new value to store in the `RcuCell`.
190    ///
191    /// # Example
192    ///
193    /// ```
194    /// let rcu_cell = rcu_128::RcuCell::new(42);
195    /// rcu_cell.update(|&old_value| old_value + 1);
196    /// {
197    ///     let guard = rcu_cell.read();
198    ///     assert_eq!(*guard, 43);
199    /// }
200    /// ```
201    pub fn update(&self, f: impl FnOnce(&T) -> T) {
202        let token_exclusive = self.update_token.write();
203        let old_value =
204            unsafe { &*((self.ptr_counter_latest.load(Ordering::Acquire) >> 64) as *const T) };
205        let new_value = f(old_value);
206        let new_ptr_counter = (Box::into_raw(Box::new(new_value)) as u128) << 64;
207        let old_ptr_counter = self
208            .ptr_counter_latest
209            .swap(new_ptr_counter, Ordering::AcqRel);
210        drop(token_exclusive);
211        self.clear(old_ptr_counter);
212    }
213
214    /// Clears the old value from memory once it is no longer needed.
215    ///
216    /// This function is called internally to release the memory of the old
217    /// value after it has been replaced by a new value. It ensures that all
218    /// readers have completed their read operations on the old value before
219    /// freeing the memory.
220    ///
221    /// # Arguments
222    ///
223    /// * `old_ptr_counter` - The old pointer and counter value to be cleared.
224    ///
225    /// This function does not need to be called directly by users of the
226    /// `RcuCell`.
227    fn clear(&self, old_ptr_counter: u128) {
228        if old_ptr_counter & 0xffff_ffff_ffff_ffff == 0 {
229            // No reader, release memory directly
230            unsafe {
231                let _ = Box::from_raw((old_ptr_counter >> 64) as usize as *mut T);
232            }
233            return;
234        }
235
236        // Only one thread can clear ptr_counter_to_clear at the same time
237        while self
238            .ptr_counter_to_clear
239            .compare_exchange_weak(0, old_ptr_counter, Ordering::AcqRel, Ordering::Relaxed)
240            .is_err()
241        {
242            // Inner loop to only get shared memory access (MESI protocal)
243            while self.ptr_counter_to_clear.load(Ordering::Relaxed) != 0 {
244                hint::spin_loop();
245            }
246        }
247
248        // No need to use CAS here because when the counter is 0,
249        // it will not be updated by other threads
250        //
251        // Wait for all readers to finish
252        while self.ptr_counter_to_clear.load(Ordering::Acquire) & 0xffff_ffff_ffff_ffff != 0 {
253            hint::spin_loop();
254        }
255        // Clear ptr_counter_to_clear to allow other writers to release memory
256        self.ptr_counter_to_clear.store(0, Ordering::Release);
257        unsafe {
258            let _ = Box::from_raw((old_ptr_counter >> 64) as usize as *mut T);
259        }
260    }
261}