rcu_128/lib.rs
1#![cfg(target_has_atomic = "128")]
2#![feature(integer_atomics)]
3#![no_std]
4extern crate alloc;
5use alloc::boxed::Box;
6use parking_lot::RwLock;
7
8use core::{
9 hint,
10 marker::PhantomData,
11 ops::Deref,
12 ptr::NonNull,
13 sync::atomic::{AtomicU128, Ordering},
14};
15
16/// A guard that provides read access to a value in an `RcuCell`.
17///
18/// When this guard is dropped, it will signal that the read operation
19/// is complete, allowing the `RcuCell` to manage its internal state
20/// accordingly.
21pub struct RcuGuard<'a, T> {
22 ptr: NonNull<T>,
23 cell: &'a RcuCell<T>,
24}
25
26impl<T> Deref for RcuGuard<'_, T> {
27 type Target = T;
28 fn deref(&self) -> &T {
29 unsafe { self.ptr.as_ref() }
30 }
31}
32
33impl<T> Drop for RcuGuard<'_, T> {
34 fn drop(&mut self) {
35 // Try to decrement ptr_counter_latest first
36 loop {
37 let ptr_counter = self.cell.ptr_counter_latest.load(Ordering::Acquire);
38 if (ptr_counter >> 64) as usize == self.ptr.as_ptr() as usize {
39 if self
40 .cell
41 .ptr_counter_latest
42 .compare_exchange_weak(
43 ptr_counter,
44 ptr_counter - 1,
45 Ordering::AcqRel,
46 Ordering::Relaxed,
47 )
48 .is_ok()
49 {
50 return;
51 }
52 } else {
53 // ptr_counter_latest has been updated, so we can't decrement it
54 break;
55 }
56 hint::spin_loop();
57 }
58 // Decrement ptr_counter_to_clear
59 loop {
60 let ptr_counter = self.cell.ptr_counter_to_clear.load(Ordering::Acquire);
61 if (ptr_counter >> 64) as usize == self.ptr.as_ptr() as usize
62 && self
63 .cell
64 .ptr_counter_to_clear
65 .compare_exchange_weak(
66 ptr_counter,
67 ptr_counter - 1,
68 Ordering::AcqRel,
69 Ordering::Relaxed,
70 )
71 .is_ok()
72 {
73 return;
74 }
75 hint::spin_loop();
76 }
77 }
78}
79
80/// A concurrent data structure that allows for safe, read-copy-update (RCU)
81/// style access to its value.
82pub struct RcuCell<T> {
83 ptr_counter_latest: AtomicU128,
84 ptr_counter_to_clear: AtomicU128,
85 data: PhantomData<T>,
86 update_token: RwLock<()>,
87}
88
89impl<T> RcuCell<T> {
90 /// Creates a new `RcuCell` with the given initial value.
91 ///
92 /// This function initializes a new `RcuCell` instance, setting its
93 /// initial value to the provided `value`.
94 ///
95 /// # Arguments
96 ///
97 /// * `value` - The initial value to store in the `RcuCell`.
98 ///
99 /// # Returns
100 ///
101 /// A new instance of `RcuCell` containing the provided initial value.
102 ///
103 /// # Example
104 ///
105 /// ```
106 /// let rcu_cell = rcu_128::RcuCell::new(42);
107 /// ```
108 pub fn new(value: T) -> Self {
109 Self {
110 ptr_counter_latest: AtomicU128::new((Box::into_raw(Box::new(value)) as u128) << 64),
111 ptr_counter_to_clear: AtomicU128::new(0),
112 data: PhantomData,
113 update_token: RwLock::new(()),
114 }
115 }
116
117 /// Provides read access to the value stored in the `RcuCell`.
118 ///
119 /// This function returns an `RcuGuard`, which allows for safe,
120 /// concurrent read access to the `RcuCell`'s value.
121 ///
122 /// Once all `RcuGuard` instances referencing a particular value are
123 /// dropped, the value can be safely released during an update or write.
124 ///
125 /// # Example
126 ///
127 /// ```
128 /// let rcu_cell = rcu_128::RcuCell::new(42);
129 /// {
130 /// let guard = rcu_cell.read();
131 /// assert_eq!(*guard, 42);
132 /// }
133 /// ```
134 pub fn read(&self) -> RcuGuard<'_, T> {
135 let ptr = unsafe {
136 NonNull::new_unchecked(
137 (self.ptr_counter_latest.fetch_add(1, Ordering::AcqRel) >> 64) as usize as *mut T,
138 )
139 };
140 RcuGuard { cell: self, ptr }
141 }
142
143 /// Writes a new value into the `RcuCell`.
144 ///
145 /// This function immediately writes the new value into the `RcuCell`.
146 /// It will block until all current readers have finished reading
147 /// the old value.
148 ///
149 /// Once all readers have completed their read operations, the
150 /// old value will be safely released.
151 ///
152 /// # Arguments
153 ///
154 /// * `value` - The new value to store in the `RcuCell`.
155 ///
156 /// # Example
157 ///
158 /// ```
159 /// let rcu_cell = rcu_128::RcuCell::new(42);
160 /// rcu_cell.write(100);
161 /// {
162 /// let guard = rcu_cell.read();
163 /// assert_eq!(*guard, 100);
164 /// }
165 /// ```
166 pub fn write(&self, value: T) {
167 let new_ptr_counter = (Box::into_raw(Box::new(value)) as u128) << 64;
168 let token_shared = self.update_token.read();
169 let old_ptr_counter = self
170 .ptr_counter_latest
171 .swap(new_ptr_counter, Ordering::AcqRel);
172 drop(token_shared);
173 self.clear(old_ptr_counter);
174 }
175
176 /// Updates the value stored in the `RcuCell` using a provided function.
177 ///
178 /// This function applies the given closure `f` to the current value
179 /// stored in the `RcuCell`, replacing it with the new value returned
180 /// by the closure. It will block until all current readers have finished
181 /// reading the old value.
182 ///
183 /// Once all readers have completed their read operations, the old value
184 /// will be safely released.
185 ///
186 /// # Arguments
187 ///
188 /// * `f` - A closure that takes a reference to the current value and returns
189 /// a new value to store in the `RcuCell`.
190 ///
191 /// # Example
192 ///
193 /// ```
194 /// let rcu_cell = rcu_128::RcuCell::new(42);
195 /// rcu_cell.update(|&old_value| old_value + 1);
196 /// {
197 /// let guard = rcu_cell.read();
198 /// assert_eq!(*guard, 43);
199 /// }
200 /// ```
201 pub fn update(&self, f: impl FnOnce(&T) -> T) {
202 let token_exclusive = self.update_token.write();
203 let old_value =
204 unsafe { &*((self.ptr_counter_latest.load(Ordering::Acquire) >> 64) as *const T) };
205 let new_value = f(old_value);
206 let new_ptr_counter = (Box::into_raw(Box::new(new_value)) as u128) << 64;
207 let old_ptr_counter = self
208 .ptr_counter_latest
209 .swap(new_ptr_counter, Ordering::AcqRel);
210 drop(token_exclusive);
211 self.clear(old_ptr_counter);
212 }
213
214 /// Clears the old value from memory once it is no longer needed.
215 ///
216 /// This function is called internally to release the memory of the old
217 /// value after it has been replaced by a new value. It ensures that all
218 /// readers have completed their read operations on the old value before
219 /// freeing the memory.
220 ///
221 /// # Arguments
222 ///
223 /// * `old_ptr_counter` - The old pointer and counter value to be cleared.
224 ///
225 /// This function does not need to be called directly by users of the
226 /// `RcuCell`.
227 fn clear(&self, old_ptr_counter: u128) {
228 if old_ptr_counter & 0xffff_ffff_ffff_ffff == 0 {
229 // No reader, release memory directly
230 unsafe {
231 let _ = Box::from_raw((old_ptr_counter >> 64) as usize as *mut T);
232 }
233 return;
234 }
235
236 // Only one thread can clear ptr_counter_to_clear at the same time
237 while self
238 .ptr_counter_to_clear
239 .compare_exchange_weak(0, old_ptr_counter, Ordering::AcqRel, Ordering::Relaxed)
240 .is_err()
241 {
242 // Inner loop to only get shared memory access (MESI protocal)
243 while self.ptr_counter_to_clear.load(Ordering::Relaxed) != 0 {
244 hint::spin_loop();
245 }
246 }
247
248 // No need to use CAS here because when the counter is 0,
249 // it will not be updated by other threads
250 //
251 // Wait for all readers to finish
252 while self.ptr_counter_to_clear.load(Ordering::Acquire) & 0xffff_ffff_ffff_ffff != 0 {
253 hint::spin_loop();
254 }
255 // Clear ptr_counter_to_clear to allow other writers to release memory
256 self.ptr_counter_to_clear.store(0, Ordering::Release);
257 unsafe {
258 let _ = Box::from_raw((old_ptr_counter >> 64) as usize as *mut T);
259 }
260 }
261}