light_qsbr/local_manager.rs
1/// This module provides the [`LocalManager`].
2use std::alloc::{Layout, dealloc};
3use std::cell::UnsafeCell;
4use std::mem::MaybeUninit;
5use std::time::Duration;
6use std::{mem, thread};
7use orengine_utils::clear_with;
8use orengine_utils::hints::{likely, unlikely, unwrap_or_bug_hint, unwrap_or_bug_message_hint};
9use orengine_utils::OrengineInstant;
10use crate::deffered::Deferred;
11use crate::shared_manager::SharedManager;
12
13/// A storage of objects that need to be deallocated or dropped.
14struct Storage {
15 to_deallocate: Vec<(*mut u8, Layout)>,
16 to_drop: Vec<Deferred>,
17}
18
19impl Storage {
20 /// Creates a new empty storage.
21 const fn new() -> Self {
22 Self {
23 to_deallocate: Vec::new(),
24 to_drop: Vec::new(),
25 }
26 }
27
28 /// Clears the storage.
29 ///
30 /// When #[cfg(test)] is enabled, it also increments the number of bytes deallocated.
31 #[allow(unused_variables, reason = "It is used for tests")]
32 fn clear(&mut self, shared_manager: &SharedManager) {
33 clear_with(&mut self.to_deallocate, |(ptr, layout)| unsafe {
34 #[cfg(test)]
35 {
36 shared_manager.increment_bytes_deallocated(layout.size());
37 }
38
39 dealloc(ptr, layout);
40 });
41
42 clear_with(&mut self.to_drop, |f| {
43 f.call();
44 });
45 }
46
47 /// Transfers all objects from `other` to `self`.
48 fn append(&mut self, other: &mut Self) {
49 self.to_deallocate.append(&mut other.to_deallocate);
50 self.to_drop.append(&mut other.to_drop);
51 }
52}
53
54#[allow(
55 clippy::non_send_fields_in_send_ty,
56 reason = "We guarantee that it is `Send`"
57)]
58unsafe impl Send for Storage {}
59unsafe impl Sync for Storage {}
60
61/// A local manager of objects that need to be deallocated or dropped when it is safe.
62///
63/// It should be created by some runtime with [`SharedManager::register_new_executor`], and only
64/// the runtime can call [`LocalManager::deregister`] and [`LocalManager::maybe_pass_epoch`].
65/// The runtime should call [`LocalManager::deregister`] when it is stopped and should call
66/// [`LocalManager::maybe_pass_epoch`] periodically (it uses cached time to prevent unnecessary
67/// attempts to pass the epoch).
68///
69/// After the registration, the `LocalManager` is stored in thread-local storage and can be
70/// accessed with [`local_manager`].
71///
72/// It allows scheduling objects to be deallocated or dropped when it is safe.
73///
74/// Use [`LocalManager::schedule_deallocate`] and [`LocalManager::schedule_deallocate_slice`]
75/// to schedule deallocation of objects, and [`LocalManager::schedule_drop`] to schedule dropping
76/// of objects.
77///
78/// # Example
79///
80/// ```rust
81/// use std::cell::Cell;
82/// use light_qsbr::{local_manager, SharedManager, orengine_utils::OrengineInstant, LocalManager};
83///
84/// # struct Runtime { tasks: Vec<Box<dyn FnOnce() + Send + 'static>>, is_stopped: Cell<bool> }
85/// # struct LockFreeStack<T> { ptr: *mut T }
86/// # struct LockFreeStackNode<T> { ptr: *mut T }
87/// # impl<T> LockFreeStackNode<T> {
88/// # fn get_node_ptr(&self) -> *mut LockFreeStackNode<T> {
89/// # unreachable!()
90/// # }
91/// # }
92/// # impl<T> LockFreeStack<T> {
93/// # fn pop(&self) -> LockFreeStackNode<T> { LockFreeStackNode::<T> { ptr: unreachable!() } }
94/// # }
95/// #
96/// fn start_runtime() {
97/// let mut runtime = Runtime { tasks: Vec::new(), is_stopped: Cell::new(false) };
98/// let shared_manager = SharedManager::new();
99///
100/// shared_manager.register_new_executor();
101///
102/// 'runtime: loop {
103/// for _ in 0..61 {
104/// if let Some(task) = runtime.tasks.pop() {
105/// task();
106/// } else {
107/// break 'runtime;
108/// }
109/// }
110///
111/// if runtime.is_stopped.get() {
112/// break;
113/// }
114///
115/// local_manager().maybe_pass_epoch(OrengineInstant::now()); // Free some memory if it is safe
116/// }
117///
118/// unsafe { LocalManager::deregister() };
119/// }
120///
121/// fn one_of_tasks(lock_free_stack: LockFreeStack<usize>) {
122/// let node = lock_free_stack.pop();
123/// let ptr_to_deallocate: *mut LockFreeStackNode<usize> = node.get_node_ptr();
124/// // It's not safe to release the node right now because other threads can load it but still not read it.
125/// unsafe {
126/// local_manager() // Get the thread-local LocalManager
127/// .schedule_deallocate(ptr_to_deallocate);
128/// }
129/// // The node will be deallocated when it is safe.
130/// }
131/// ```
132pub struct LocalManager {
133 current_epoch: usize,
134 this_epoch_start: OrengineInstant,
135 was_passed_epoch: bool,
136 shared_manager: SharedManager,
137 was_stopped: bool,
138
139 prev_storage: Storage,
140 current_storage: Storage,
141}
142
143impl LocalManager {
144 /// Creates a new `LocalManager`.
145 pub(crate) fn new(shared_manager: &SharedManager) -> Self {
146 Self {
147 current_epoch: shared_manager.current_epoch(),
148 this_epoch_start: unsafe { MaybeUninit::zeroed().assume_init() },
149 was_passed_epoch: false,
150 shared_manager: shared_manager.clone(),
151 was_stopped: false,
152
153 prev_storage: Storage::new(),
154 current_storage: Storage::new(),
155 }
156 }
157
158 /// # Panics
159 ///
160 /// Panics if the [`LocalManager`] is stopped.
161 #[cold]
162 #[inline(never)]
163 fn handle_stopped(&self) {
164 assert!(self.was_stopped);
165
166 panic!(
167 "`LocalManager` is used after calling `LocalManager::temporary_deregister` \
168 and before calling `LocalManager::resume_after_temporary_deregister`."
169 )
170 }
171
172 /// Returns the number of bytes deallocated since creation of the `SharedManager`.
173 #[cfg(test)]
174 pub(crate) fn bytes_deallocated(&self) -> usize {
175 self.shared_manager.bytes_deallocated()
176 }
177
178 /// Returns the current epoch.
179 pub fn current_epoch(&self) -> usize {
180 self.current_epoch
181 }
182
183 /// Returns a reference to the associated [`SharedManager`].
184 pub fn shared_manager(&self) -> &SharedManager {
185 &self.shared_manager
186 }
187
188 /// Schedules deallocation of an object. It will be deallocated when it is safe.
189 ///
190 /// # Safety
191 ///
192 /// It requires the same safety conditions as [`dealloc`].
193 pub unsafe fn schedule_deallocate<T>(&mut self, ptr: *const T) {
194 if unlikely(self.was_stopped) {
195 self.handle_stopped();
196 }
197
198 self.current_storage
199 .to_deallocate
200 .push((ptr.cast::<u8>().cast_mut(), Layout::new::<T>()));
201 }
202
203 /// Schedules deallocation of the provided slice.
204 /// They will be deallocated when it is safe.
205 ///
206 /// # Safety
207 ///
208 /// It requires the same safety conditions as [`dealloc`].
209 ///
210 /// # Panics
211 ///
212 ///
213 pub unsafe fn schedule_deallocate_slice<T>(&mut self, ptr: *const T, len: usize) {
214 if unlikely(self.was_stopped) {
215 self.handle_stopped();
216 }
217
218 self.current_storage.to_deallocate.push((
219 ptr.cast::<u8>().cast_mut(),
220 unwrap_or_bug_hint(Layout::array::<T>(len)),
221 ));
222 }
223
224 /// Schedules dropping of the provided function.
225 /// It will be dropped when it is safe.
226 ///
227 /// The function can be a closure; therefore, it can be used to drop any object.
228 ///
229 /// # Safety
230 ///
231 /// It requires the same safety conditions as [`mem::ManuallyDrop::drop`].
232 pub unsafe fn schedule_drop<F: FnOnce()>(&mut self, func: F) {
233 if unlikely(self.was_stopped) {
234 self.handle_stopped();
235 }
236
237 self.current_storage.to_drop.push(Deferred::new(func));
238 }
239
240 /// Collects garbage or the previous epoch's storage.
241 pub(crate) fn collect_garbage(&mut self) {
242 self.prev_storage.clear(&self.shared_manager);
243 }
244
245 /// Reacts to the epoch change.
246 fn react_to_epoch_change(&mut self, global_epoch: usize, now: OrengineInstant) {
247 debug_assert_eq!(global_epoch, self.current_epoch + 1);
248
249 self.current_epoch = global_epoch;
250 self.this_epoch_start = now;
251 self.was_passed_epoch = false;
252
253 self.collect_garbage();
254 }
255
256 /// Maybe passes the epoch and frees some memory if it is safe.
257 ///
258 /// This function accepts the current time as a parameter to avoid
259 /// very often epoch passing.
260 ///
261 /// While at least one thread doesn't pass the epoch,
262 /// all other threads can't free memory.
263 pub fn maybe_pass_epoch(&mut self, now: impl Into<OrengineInstant>) {
264 #[cfg(not(test))]
265 const EXPECTED_EPOCH_DURATION: Duration = Duration::from_millis(10);
266
267 #[cfg(test)]
268 const EXPECTED_EPOCH_DURATION: Duration = Duration::from_micros(100);
269
270 let now = now.into();
271
272 if unlikely(self.was_stopped) {
273 self.handle_stopped();
274 }
275
276 if likely(now - self.this_epoch_start < EXPECTED_EPOCH_DURATION) {
277 return;
278 }
279
280 let global_epoch = self.shared_manager.current_epoch();
281
282 if unlikely(self.current_epoch < global_epoch) {
283 debug_assert!(self.was_passed_epoch);
284
285 self.react_to_epoch_change(global_epoch, now);
286
287 return;
288 }
289
290 debug_assert_eq!(self.current_epoch, global_epoch);
291
292 if likely(self.was_passed_epoch) {
293 return;
294 }
295
296 self.was_passed_epoch = true;
297
298 debug_assert!(
299 self.prev_storage.to_drop.is_empty() && self.prev_storage.to_deallocate.is_empty()
300 );
301 mem::swap(&mut self.prev_storage, &mut self.current_storage);
302
303 let was_changed = self.shared_manager.executor_passed_epoch();
304 if unlikely(was_changed) {
305 self.react_to_epoch_change(global_epoch + 1, now);
306 }
307 }
308
309 /// Deregisters the thread-local `LocalManager`.
310 /// After that the calling [`local_manager`] can cause undefined behavior before the next
311 /// [`registration`].
312 ///
313 /// # Safety
314 ///
315 /// * The thread must be registered in the [`SharedManager`].
316 /// * It is called only once for one [`registration`].
317 /// * After calling this function, the caller doesn't call [`local_manager`] before the next
318 /// [`registration`].
319 ///
320 /// # Panics
321 ///
322 /// It the thread is not registered.
323 ///
324 /// [`registration`]: SharedManager::register_new_executor
325 pub unsafe fn deregister() {
326 struct DeregisterInNewEpochArgs {
327 epoch_at_start: usize,
328 storage: Storage,
329 shared_manager: SharedManager,
330 }
331
332 fn deregister_in_new_epoch(
333 mut args: DeregisterInNewEpochArgs
334 ) {
335 fn wait_new_epoch_and_clear(
336 shared_manager: &SharedManager,
337 mut storage: Storage,
338 epoch_at_start: usize
339 ) {
340 while shared_manager.current_epoch() == epoch_at_start {
341 thread::sleep(Duration::from_millis(1));
342 }
343
344 storage.clear(shared_manager);
345 }
346
347 let is_new_epoch = args.shared_manager.deregister_executor();
348
349 if is_new_epoch {
350 debug_assert_ne!(args.epoch_at_start, args.shared_manager.current_epoch());
351
352 // The executor passed the current epoch and has been deregistered
353 args.storage.clear(&args.shared_manager);
354
355 return;
356 }
357
358 wait_new_epoch_and_clear(&args.shared_manager, args.storage, args.epoch_at_start);
359 }
360
361 let mut local_manager = LOCAL_MANAGER
362 .with(|local_manager_| unsafe {
363 (*local_manager_.get())
364 .take()
365 .expect("LocalManager is not registered in this thread")
366 });
367
368 let epoch_at_start = local_manager.current_epoch;
369 let mut full_storage = mem::replace(&mut local_manager.current_storage, Storage::new());
370
371 full_storage.append(&mut local_manager.prev_storage);
372
373 let mut args = DeregisterInNewEpochArgs {
374 epoch_at_start,
375 storage: full_storage,
376 shared_manager: local_manager
377 .shared_manager
378 .clone(),
379 };
380
381 // Maybe we still have not passed the current epoch
382 if !local_manager.was_passed_epoch {
383 deregister_in_new_epoch(args);
384
385 return;
386 }
387
388 while args.shared_manager.current_epoch() == args.epoch_at_start {
389 thread::sleep(Duration::from_millis(1));
390 }
391
392 args.epoch_at_start += 1;
393
394 deregister_in_new_epoch(args);
395 }
396
397 /// Deregisters the thread-local `LocalManager` without reclaiming memory.
398 ///
399 /// It is the unsafest function in the library.
400 /// It is expected to be called only before the thread is stopped
401 /// and to be __resumed__ by calling [`LocalManager::resume_after_temporary_deregister`]
402 /// after the thread is resumed.
403 ///
404 /// After this function is called and before calling
405 /// [`LocalManager::resume_after_temporary_deregister`], the thread-local `LocalManager`
406 /// is invalid.
407 ///
408 /// You should completely deregister the thread-local `LocalManager` by calling
409 /// [`LocalManager::deregister`] before the thread is terminated.
410 ///
411 /// # Safety
412 ///
413 /// * The thread must be registered in the [`SharedManager`].
414 /// * After calling this function, the [`LocalManager`] is not used before calling
415 /// [`LocalManager::resume_after_temporary_deregister`].
416 /// * [`LocalManager::deregister`] must be called before the thread is terminated.
417 ///
418 /// # Example
419 ///
420 /// ```rust
421 /// use light_qsbr::{local_manager, LocalManager, SharedManager};
422 ///
423 /// let shared_manager = SharedManager::new();
424 ///
425 /// shared_manager.register_new_executor();
426 ///
427 /// // Do some work
428 ///
429 /// // While thread is stopped it can't call `local_manager.maybe_pass_epoch`.
430 /// // But completely deregister is excessive.
431 /// // While `local_manager().temporary_deregister()` is inexpensive
432 /// // and allows other threads to pass epochs.
433 ///
434 /// unsafe { local_manager().temporary_deregister(); }
435 ///
436 /// std::thread::sleep(std::time::Duration::from_secs(1));
437 ///
438 /// unsafe { local_manager().resume_after_temporary_deregister(); }
439 ///
440 /// // Do some work
441 ///
442 /// unsafe { LocalManager::deregister(); }
443 /// ```
444 pub unsafe fn temporary_deregister(&mut self) {
445 self.was_stopped = true;
446
447 self.shared_manager().deregister_executor();
448 }
449
450 /// Resumes the thread-local `LocalManager` after calling [`LocalManager::temporary_deregister`].
451 ///
452 /// # Safety
453 ///
454 /// * The thread must be registered (before calling [`LocalManager::temporary_deregister`])
455 /// in the [`SharedManager`].
456 /// * [`LocalManager::temporary_deregister`] must be called before calling this function.
457 ///
458 /// # Example
459 ///
460 /// You can find an example in [`LocalManager::temporary_deregister`].
461 pub unsafe fn resume_after_temporary_deregister(&mut self) {
462 self.was_stopped = false;
463
464 self.shared_manager().register_executor_again();
465 }
466}
467
468thread_local! {
469 /// A thread-local [`LocalManager`].
470 pub(crate) static LOCAL_MANAGER: UnsafeCell<Option<LocalManager>> = const { UnsafeCell::new(None) };
471}
472
473/// Returns a reference to the thread-local [`LocalManager`].
474///
475/// # Panics
476///
477/// It the thread is not registered.
478///
479/// # Undefined behavior
480///
481/// If the thread is not registered when `cfg(debug_assertions)` is disabled,
482/// it causes undefined behavior.
483pub fn local_manager() -> &'static mut LocalManager {
484 LOCAL_MANAGER
485 .with(|local_manager| unsafe {
486 unwrap_or_bug_message_hint(
487 (*local_manager.get()).as_mut(),
488 "Local manager is not registered in this thread."
489 )
490 })
491}
492
493#[cfg(test)]
494mod tests {
495 use std::sync::{Arc, Condvar, Mutex};
496 use super::*;
497
498 #[test]
499 fn test_temporary_deregister() {
500 let shared_manager = SharedManager::new();
501 let was_started = Arc::new((Mutex::new(false), Condvar::new()));
502 let was_started_clone = was_started.clone();
503 let was_passed = Arc::new((Mutex::new(false), Condvar::new()));
504 let was_passed_clone = was_passed.clone();
505
506 shared_manager.register_new_executor();
507
508 let handle = thread::spawn(move || {
509 shared_manager.register_new_executor();
510
511 *was_started.0.lock().unwrap() = true;
512
513 was_started.1.notify_one();
514
515 for _ in 0..10 {
516 local_manager().maybe_pass_epoch(OrengineInstant::now());
517
518 thread::sleep(Duration::from_millis(1));
519 }
520
521 *was_passed.0.lock().unwrap() = true;
522
523 was_passed.1.notify_one();
524
525 unsafe { LocalManager::deregister(); }
526 });
527
528 let mut started = was_started_clone.0.lock().unwrap();
529 while !*started {
530 started = was_started_clone.1.wait(started).unwrap();
531 }
532
533 drop(started);
534
535 unsafe { local_manager().temporary_deregister(); }
536
537 let mut passed = was_passed_clone.0.lock().unwrap();
538 let mut timeout_error;
539 while !*passed {
540 (passed, timeout_error) = was_passed_clone.1.wait_timeout(passed, Duration::from_secs(3)).unwrap();
541
542 assert!(!timeout_error.timed_out());
543 }
544
545 drop(passed);
546
547 unsafe {
548 local_manager().resume_after_temporary_deregister();
549 LocalManager::deregister();
550 }
551
552 handle.join().unwrap();
553 }
554}