light_qsbr/local_manager.rs
1/// This module provides the [`LocalManager`].
2use std::alloc::{Layout, dealloc};
3use std::cell::UnsafeCell;
4use std::mem::MaybeUninit;
5use std::time::Duration;
6use std::{mem, thread};
7use orengine_utils::clear_with;
8use orengine_utils::hints::{likely, unlikely, unwrap_or_bug_hint, unwrap_or_bug_message_hint};
9use orengine_utils::instant::OrengineInstant;
10use crate::deffered::Deferred;
11use crate::shared_manager::SharedManager;
12
13/// A storage of objects that need to be deallocated or dropped.
14struct Storage {
15 to_deallocate: Vec<(*mut u8, Layout)>,
16 to_drop: Vec<Deferred>,
17}
18
19impl Storage {
20 /// Creates a new empty storage.
21 const fn new() -> Self {
22 Self {
23 to_deallocate: Vec::new(),
24 to_drop: Vec::new(),
25 }
26 }
27
28 /// Clears the storage.
29 ///
30 /// When #[cfg(test)] is enabled, it also increments the number of bytes deallocated.
31 #[allow(unused_variables, reason = "It is used for tests")]
32 fn clear(&mut self, shared_manager: &SharedManager) {
33 clear_with(&mut self.to_deallocate, |(ptr, layout)| unsafe {
34 #[cfg(test)]
35 {
36 shared_manager.increment_bytes_deallocated(layout.size());
37 }
38
39 dealloc(ptr, layout);
40 });
41
42 clear_with(&mut self.to_drop, |f| {
43 f.call();
44 });
45 }
46
47 /// Transfers all objects from `other` to `self`.
48 fn append(&mut self, other: &mut Self) {
49 self.to_deallocate.append(&mut other.to_deallocate);
50 self.to_drop.append(&mut other.to_drop);
51 }
52}
53
54#[allow(
55 clippy::non_send_fields_in_send_ty,
56 reason = "We guarantee that it is `Send`"
57)]
58unsafe impl Send for Storage {}
59unsafe impl Sync for Storage {}
60
61/// A local manager of objects that need to be deallocated or dropped when it is safe.
62///
63/// It should be created by some runtime with [`SharedManager::register_new_executor`], and only
64/// the runtime can call [`LocalManager::deregister`] and [`LocalManager::maybe_pass_epoch`].
65/// The runtime should call [`LocalManager::deregister`] when it is stopped and should call
66/// [`LocalManager::maybe_pass_epoch`] periodically (it uses cached time to prevent unnecessary
67/// attempts to pass the epoch).
68///
69/// After the registration, the `LocalManager` is stored in thread-local storage and can be
70/// accessed with [`local_manager`].
71///
72/// It allows scheduling objects to be deallocated or dropped when it is safe.
73///
74/// Use [`LocalManager::schedule_deallocate`] and [`LocalManager::schedule_deallocate_slice`]
75/// to schedule deallocation of objects, and [`LocalManager::schedule_drop`] to schedule dropping
76/// of objects.
77///
78/// # Example
79///
80/// ```rust
81/// use std::cell::Cell;
82/// use light_qsbr::{local_manager, SharedManager, orengine_utils::instant::OrengineInstant, LocalManager};
83///
84/// # struct Runtime { tasks: Vec<Box<dyn FnOnce() + Send + 'static>>, is_stopped: Cell<bool> }
85/// # struct LockFreeStack<T> { ptr: *mut T }
86/// # struct LockFreeStackNode<T> { ptr: *mut T }
87/// # impl<T> LockFreeStackNode<T> {
88/// # fn get_node_ptr(&self) -> *mut LockFreeStackNode<T> {
89/// # unreachable!()
90/// # }
91/// # }
92/// # impl<T> LockFreeStack<T> {
93/// # fn pop(&self) -> LockFreeStackNode<T> { LockFreeStackNode::<T> { ptr: unreachable!() } }
94/// # }
95/// #
96/// fn start_runtime() {
97/// let mut runtime = Runtime { tasks: Vec::new(), is_stopped: Cell::new(false) };
98/// let shared_manager = SharedManager::new();
99///
100/// shared_manager.register_new_executor();
101///
102/// 'runtime: loop {
103/// for _ in 0..61 {
104/// if let Some(task) = runtime.tasks.pop() {
105/// task();
106/// } else {
107/// break 'runtime;
108/// }
109/// }
110///
111/// if runtime.is_stopped.get() {
112/// break;
113/// }
114///
115/// local_manager().maybe_pass_epoch(OrengineInstant::now()); // Free some memory if it is safe
116/// }
117///
118/// unsafe { LocalManager::deregister() };
119/// }
120///
121/// fn one_of_tasks(lock_free_stack: LockFreeStack<usize>) {
122/// let node = lock_free_stack.pop();
123/// let ptr_to_deallocate: *mut LockFreeStackNode<usize> = node.get_node_ptr();
124/// // It's not safe to release the node right now because other threads can load it but still not read it.
125/// unsafe {
126/// local_manager() // Get the thread-local LocalManager
127/// .schedule_deallocate(ptr_to_deallocate);
128/// }
129/// // The node will be deallocated when it is safe.
130/// }
131/// ```
132pub struct LocalManager {
133 current_epoch: usize,
134 this_epoch_start: OrengineInstant,
135 was_passed_epoch: bool,
136 shared_manager: SharedManager,
137
138 prev_storage: Storage,
139 current_storage: Storage,
140}
141
142impl LocalManager {
143 /// Creates a new `LocalManager`.
144 pub(crate) fn new(shared_manager: &SharedManager) -> Self {
145 Self {
146 current_epoch: shared_manager.current_epoch(),
147 this_epoch_start: unsafe { MaybeUninit::zeroed().assume_init() },
148 was_passed_epoch: false,
149 shared_manager: shared_manager.clone(),
150
151 prev_storage: Storage::new(),
152 current_storage: Storage::new(),
153 }
154 }
155
156 /// Returns the number of bytes deallocated since creation of the `SharedManager`.
157 #[cfg(test)]
158 pub(crate) fn bytes_deallocated(&self) -> usize {
159 self.shared_manager.bytes_deallocated()
160 }
161
162 /// Returns the current epoch.
163 pub fn current_epoch(&self) -> usize {
164 self.current_epoch
165 }
166
167 /// Returns a reference to the associated [`SharedManager`].
168 pub fn shared_manager(&self) -> &SharedManager {
169 &self.shared_manager
170 }
171
172 /// Schedules deallocation of an object. It will be deallocated when it is safe.
173 ///
174 /// # Safety
175 ///
176 /// It requires the same safety conditions as [`dealloc`].
177 pub unsafe fn schedule_deallocate<T>(&mut self, ptr: *const T) {
178 self.current_storage
179 .to_deallocate
180 .push((ptr.cast::<u8>().cast_mut(), Layout::new::<T>()));
181 }
182
183 /// Schedules deallocation of the provided slice.
184 /// They will be deallocated when it is safe.
185 ///
186 /// # Safety
187 ///
188 /// It requires the same safety conditions as [`dealloc`].
189 ///
190 /// # Panics
191 ///
192 ///
193 pub unsafe fn schedule_deallocate_slice<T>(&mut self, ptr: *const T, len: usize) {
194 self.current_storage.to_deallocate.push((
195 ptr.cast::<u8>().cast_mut(),
196 unwrap_or_bug_hint(Layout::array::<T>(len)),
197 ));
198 }
199
200 /// Schedules dropping of the provided function.
201 /// It will be dropped when it is safe.
202 ///
203 /// The function can be a closure; therefore, it can be used to drop any object.
204 ///
205 /// # Safety
206 ///
207 /// It requires the same safety conditions as [`mem::ManuallyDrop::drop`].
208 pub unsafe fn schedule_drop<F: FnOnce()>(&mut self, func: F) {
209 self.current_storage.to_drop.push(Deferred::new(func));
210 }
211
212 /// Collects garbage or the previous epoch's storage.
213 pub(crate) fn collect_garbage(&mut self) {
214 self.prev_storage.clear(&self.shared_manager);
215 }
216
217 /// Reacts to the epoch change.
218 fn react_to_epoch_change(&mut self, global_epoch: usize, now: OrengineInstant) {
219 debug_assert_eq!(global_epoch, self.current_epoch + 1);
220
221 self.current_epoch = global_epoch;
222 self.this_epoch_start = now;
223 self.was_passed_epoch = false;
224
225 self.collect_garbage();
226 }
227
228 /// Maybe passes the epoch and frees some memory if it is safe.
229 ///
230 /// This function accepts the current time as a parameter to avoid
231 /// very often epoch passing.
232 ///
233 /// While at least one thread doesn't pass the epoch,
234 /// all other threads can't free memory.
235 pub fn maybe_pass_epoch(&mut self, now: OrengineInstant) {
236 #[cfg(not(test))]
237 const EXPECTED_EPOCH_DURATION: Duration = Duration::from_millis(10);
238
239 #[cfg(test)]
240 const EXPECTED_EPOCH_DURATION: Duration = Duration::from_micros(100);
241
242 if likely(now - self.this_epoch_start < EXPECTED_EPOCH_DURATION) {
243 return;
244 }
245
246 let global_epoch = self.shared_manager.current_epoch();
247
248 if unlikely(self.current_epoch < global_epoch) {
249 debug_assert!(self.was_passed_epoch);
250
251 self.react_to_epoch_change(global_epoch, now);
252
253 return;
254 }
255
256 debug_assert_eq!(self.current_epoch, global_epoch);
257
258 if likely(self.was_passed_epoch) {
259 return;
260 }
261
262 self.was_passed_epoch = true;
263
264 debug_assert!(
265 self.prev_storage.to_drop.is_empty() && self.prev_storage.to_deallocate.is_empty()
266 );
267 mem::swap(&mut self.prev_storage, &mut self.current_storage);
268
269 let was_changed = self.shared_manager.executor_passed_epoch();
270 if unlikely(was_changed) {
271 self.react_to_epoch_change(global_epoch + 1, now);
272 }
273 }
274
275 /// Deregisters the thread-local `LocalManager`.
276 /// After that the calling [`local_manager`] can cause undefined behavior before the next
277 /// [`registration`].
278 ///
279 /// # Safety
280 ///
281 /// * The thread must be registered in the [`SharedManager`].
282 /// * It is called only once for one [`registration`].
283 /// * After calling this function, the caller doesn't call [`local_manager`] before the next
284 /// [`registration`].
285 ///
286 /// # Panics
287 ///
288 /// It the thread is not registered.
289 ///
290 /// [`registration`]: SharedManager::register_new_executor
291 pub unsafe fn deregister() {
292 struct DeregisterInNewEpochArgs {
293 epoch_at_start: usize,
294 storage: Storage,
295 shared_manager: SharedManager,
296 }
297
298 fn deregister_in_new_epoch(
299 mut args: DeregisterInNewEpochArgs
300 ) {
301 fn wait_new_epoch_and_clear(
302 shared_manager: &SharedManager,
303 mut storage: Storage,
304 epoch_at_start: usize
305 ) {
306 while shared_manager.current_epoch() == epoch_at_start {
307 thread::sleep(Duration::from_millis(1));
308 }
309
310 storage.clear(shared_manager);
311 }
312
313 let is_new_epoch = args.shared_manager.deregister_executor();
314
315 if is_new_epoch {
316 debug_assert_ne!(args.epoch_at_start, args.shared_manager.current_epoch());
317
318 // The executor passed the current epoch and has been deregistered
319 args.storage.clear(&args.shared_manager);
320
321 return;
322 }
323
324 wait_new_epoch_and_clear(&args.shared_manager, args.storage, args.epoch_at_start);
325 }
326
327 let mut local_manager = LOCAL_MANAGER
328 .with(|local_manager_| unsafe {
329 (*local_manager_.get())
330 .take()
331 .expect("LocalManager is not registered in this thread")
332 });
333
334 let epoch_at_start = local_manager.current_epoch;
335 let mut full_storage = mem::replace(&mut local_manager.current_storage, Storage::new());
336
337 full_storage.append(&mut local_manager.prev_storage);
338
339 let mut args = DeregisterInNewEpochArgs {
340 epoch_at_start,
341 storage: full_storage,
342 shared_manager: local_manager
343 .shared_manager
344 .clone(),
345 };
346
347 // Maybe we still have not passed the current epoch
348 if !local_manager.was_passed_epoch {
349 deregister_in_new_epoch(args);
350
351 return;
352 }
353
354 while args.shared_manager.current_epoch() == args.epoch_at_start {
355 thread::sleep(Duration::from_millis(1));
356 }
357
358 args.epoch_at_start += 1;
359
360 deregister_in_new_epoch(args);
361 }
362}
363
364thread_local! {
365 /// A thread-local [`LocalManager`].
366 pub(crate) static LOCAL_MANAGER: UnsafeCell<Option<LocalManager>> = const { UnsafeCell::new(None) };
367}
368
369/// Returns a reference to the thread-local [`LocalManager`].
370///
371/// # Panics
372///
373/// It the thread is not registered.
374///
375/// # Undefined behavior
376///
377/// If the thread is not registered when `cfg(debug_assertions)` is disabled,
378/// it causes undefined behavior.
379pub fn local_manager() -> &'static mut LocalManager {
380 LOCAL_MANAGER
381 .with(|local_manager| unsafe {
382 unwrap_or_bug_message_hint(
383 (*local_manager.get()).as_mut(),
384 "Local manager is not registered in this thread."
385 )
386 })
387}