light_qsbr/
local_manager.rs

1/// This module provides the [`LocalManager`].
2use std::alloc::{Layout, dealloc};
3use std::cell::UnsafeCell;
4use std::mem::MaybeUninit;
5use std::time::Duration;
6use std::{mem, thread};
7use orengine_utils::clear_with;
8use orengine_utils::hints::{likely, unlikely, unwrap_or_bug_hint, unwrap_or_bug_message_hint};
9use orengine_utils::OrengineInstant;
10use crate::deffered::Deferred;
11use crate::shared_manager::SharedManager;
12
13/// A storage of objects that need to be deallocated or dropped.
14struct Storage {
15    to_deallocate: Vec<(*mut u8, Layout)>,
16    to_drop: Vec<Deferred>,
17}
18
19impl Storage {
20    /// Creates a new empty storage.
21    const fn new() -> Self {
22        Self {
23            to_deallocate: Vec::new(),
24            to_drop: Vec::new(),
25        }
26    }
27
28    /// Clears the storage.
29    ///
30    /// When #[cfg(test)] is enabled, it also increments the number of bytes deallocated.
31    #[allow(unused_variables, reason = "It is used for tests")]
32    fn clear(&mut self, shared_manager: &SharedManager) {
33        clear_with(&mut self.to_deallocate, |(ptr, layout)| unsafe {
34            #[cfg(test)]
35            {
36                shared_manager.increment_bytes_deallocated(layout.size());
37            }
38
39            dealloc(ptr, layout);
40        });
41
42        clear_with(&mut self.to_drop, |f| {
43            f.call();
44        });
45    }
46
47    /// Transfers all objects from `other` to `self`.
48    fn append(&mut self, other: &mut Self) {
49        self.to_deallocate.append(&mut other.to_deallocate);
50        self.to_drop.append(&mut other.to_drop);
51    }
52}
53
54#[allow(
55    clippy::non_send_fields_in_send_ty,
56    reason = "We guarantee that it is `Send`"
57)]
58unsafe impl Send for Storage {}
59unsafe impl Sync for Storage {}
60
61/// A local manager of objects that need to be deallocated or dropped when it is safe.
62///
63/// It should be created by some runtime with [`SharedManager::register_new_executor`], and only
64/// the runtime can call [`LocalManager::deregister`] and [`LocalManager::maybe_pass_epoch`].
65/// The runtime should call [`LocalManager::deregister`] when it is stopped and should call
66/// [`LocalManager::maybe_pass_epoch`] periodically (it uses cached time to prevent unnecessary
67/// attempts to pass the epoch).
68///
69/// After the registration, the `LocalManager` is stored in thread-local storage and can be
70/// accessed with [`local_manager`].
71///
72/// It allows scheduling objects to be deallocated or dropped when it is safe.
73///
74/// Use [`LocalManager::schedule_deallocate`] and [`LocalManager::schedule_deallocate_slice`]
75/// to schedule deallocation of objects, and [`LocalManager::schedule_drop`] to schedule dropping
76/// of objects.
77///
78/// # Example
79///
80/// ```rust
81/// use std::cell::Cell;
82/// use light_qsbr::{local_manager, SharedManager, orengine_utils::OrengineInstant, LocalManager};
83///
84/// # struct Runtime { tasks: Vec<Box<dyn FnOnce() + Send + 'static>>, is_stopped: Cell<bool> }
85/// # struct LockFreeStack<T> { ptr: *mut T }
86/// # struct LockFreeStackNode<T> { ptr: *mut T }
87/// # impl<T> LockFreeStackNode<T> {
88/// #     fn get_node_ptr(&self) -> *mut LockFreeStackNode<T> {
89/// #         unreachable!()
90/// #     }
91/// # }
92/// # impl<T> LockFreeStack<T> {
93/// #     fn pop(&self) -> LockFreeStackNode<T> { LockFreeStackNode::<T> { ptr: unreachable!() } }
94/// # }
95/// #
96/// fn start_runtime() {
97///     let mut runtime = Runtime { tasks: Vec::new(), is_stopped: Cell::new(false) };
98///     let shared_manager = SharedManager::new();
99///
100///     shared_manager.register_new_executor();
101///
102///     'runtime: loop {
103///         for _ in 0..61 {
104///             if let Some(task) = runtime.tasks.pop() {
105///                 task();
106///             } else {
107///                 break 'runtime;
108///             }
109///         }
110///
111///         if runtime.is_stopped.get() {
112///             break;
113///         }
114///
115///         local_manager().maybe_pass_epoch(OrengineInstant::now()); // Free some memory if it is safe
116///     }
117///
118///     unsafe { LocalManager::deregister() };
119/// }
120///
121/// fn one_of_tasks(lock_free_stack: LockFreeStack<usize>) {
122///     let node = lock_free_stack.pop();
123///     let ptr_to_deallocate: *mut LockFreeStackNode<usize> = node.get_node_ptr();
124///     // It's not safe to release the node right now because other threads can load it but still not read it.
125///     unsafe {
126///         local_manager() // Get the thread-local LocalManager
127///             .schedule_deallocate(ptr_to_deallocate);
128///     }
129///     // The node will be deallocated when it is safe.
130/// }
131/// ```
132pub struct LocalManager {
133    current_epoch: usize,
134    this_epoch_start: OrengineInstant,
135    was_passed_epoch: bool,
136    shared_manager: SharedManager,
137    was_stopped: bool,
138
139    prev_storage: Storage,
140    current_storage: Storage,
141}
142
143impl LocalManager {
144    /// Creates a new `LocalManager`.
145    pub(crate) fn new(shared_manager: &SharedManager) -> Self {
146        Self {
147            current_epoch: shared_manager.current_epoch(),
148            this_epoch_start: unsafe { MaybeUninit::zeroed().assume_init() },
149            was_passed_epoch: false,
150            shared_manager: shared_manager.clone(),
151            was_stopped: false,
152
153            prev_storage: Storage::new(),
154            current_storage: Storage::new(),
155        }
156    }
157
158    /// # Panics
159    ///
160    /// Panics if the [`LocalManager`] is stopped.
161    #[cold]
162    #[inline(never)]
163    fn handle_stopped(&self) {
164        assert!(self.was_stopped);
165
166        panic!(
167            "`LocalManager` is used after calling `LocalManager::temporary_deregister` \
168            and before calling `LocalManager::resume_after_temporary_deregister`."
169        )
170    }
171
172    /// Returns the number of bytes deallocated since creation of the `SharedManager`.
173    #[cfg(test)]
174    pub(crate) fn bytes_deallocated(&self) -> usize {
175        self.shared_manager.bytes_deallocated()
176    }
177
178    /// Returns the current epoch.
179    pub fn current_epoch(&self) -> usize {
180        self.current_epoch
181    }
182
183    /// Returns a reference to the associated [`SharedManager`].
184    pub fn shared_manager(&self) -> &SharedManager {
185        &self.shared_manager
186    }
187
188    /// Schedules deallocation of an object. It will be deallocated when it is safe.
189    ///
190    /// # Safety
191    ///
192    /// It requires the same safety conditions as [`dealloc`].
193    pub unsafe fn schedule_deallocate<T>(&mut self, ptr: *const T) {
194        if unlikely(self.was_stopped) {
195            self.handle_stopped();
196        }
197
198        self.current_storage
199            .to_deallocate
200            .push((ptr.cast::<u8>().cast_mut(), Layout::new::<T>()));
201    }
202
203    /// Schedules deallocation of the provided slice.
204    /// They will be deallocated when it is safe.
205    ///
206    /// # Safety
207    ///
208    /// It requires the same safety conditions as [`dealloc`].
209    ///
210    /// # Panics
211    ///
212    ///
213    pub unsafe fn schedule_deallocate_slice<T>(&mut self, ptr: *const T, len: usize) {
214        if unlikely(self.was_stopped) {
215            self.handle_stopped();
216        }
217
218        self.current_storage.to_deallocate.push((
219            ptr.cast::<u8>().cast_mut(),
220            unwrap_or_bug_hint(Layout::array::<T>(len)),
221        ));
222    }
223
224    /// Schedules dropping of the provided function.
225    /// It will be dropped when it is safe.
226    ///
227    /// The function can be a closure; therefore, it can be used to drop any object.
228    ///
229    /// # Safety
230    ///
231    /// It requires the same safety conditions as [`mem::ManuallyDrop::drop`].
232    pub unsafe fn schedule_drop<F: FnOnce()>(&mut self, func: F) {
233        if unlikely(self.was_stopped) {
234            self.handle_stopped();
235        }
236
237        self.current_storage.to_drop.push(Deferred::new(func));
238    }
239
240    /// Collects garbage or the previous epoch's storage.
241    pub(crate) fn collect_garbage(&mut self) {
242        self.prev_storage.clear(&self.shared_manager);
243    }
244
245    /// Reacts to the epoch change.
246    fn react_to_epoch_change(&mut self, global_epoch: usize, now: OrengineInstant) {
247        debug_assert_eq!(global_epoch, self.current_epoch + 1);
248
249        self.current_epoch = global_epoch;
250        self.this_epoch_start = now;
251        self.was_passed_epoch = false;
252
253        self.collect_garbage();
254    }
255
256    /// Maybe passes the epoch and frees some memory if it is safe.
257    ///
258    /// This function accepts the current time as a parameter to avoid
259    /// very often epoch passing.
260    ///
261    /// While at least one thread doesn't pass the epoch,
262    /// all other threads can't free memory.
263    pub fn maybe_pass_epoch(&mut self, now: impl Into<OrengineInstant>) {
264        #[cfg(not(test))]
265        const EXPECTED_EPOCH_DURATION: Duration = Duration::from_millis(10);
266
267        #[cfg(test)]
268        const EXPECTED_EPOCH_DURATION: Duration = Duration::from_micros(100);
269
270        let now = now.into();
271
272        if unlikely(self.was_stopped) {
273            self.handle_stopped();
274        }
275
276        if likely(now - self.this_epoch_start < EXPECTED_EPOCH_DURATION) {
277            return;
278        }
279
280        let global_epoch = self.shared_manager.current_epoch();
281
282        if unlikely(self.current_epoch < global_epoch) {
283            debug_assert!(self.was_passed_epoch);
284
285            self.react_to_epoch_change(global_epoch, now);
286
287            return;
288        }
289
290        debug_assert_eq!(self.current_epoch, global_epoch);
291
292        if likely(self.was_passed_epoch) {
293            return;
294        }
295
296        self.was_passed_epoch = true;
297
298        debug_assert!(
299            self.prev_storage.to_drop.is_empty() && self.prev_storage.to_deallocate.is_empty()
300        );
301        mem::swap(&mut self.prev_storage, &mut self.current_storage);
302
303        let was_changed = self.shared_manager.executor_passed_epoch();
304        if unlikely(was_changed) {
305            self.react_to_epoch_change(global_epoch + 1, now);
306        }
307    }
308
309    /// Deregisters the thread-local `LocalManager`.
310    /// After that the calling [`local_manager`] can cause undefined behavior before the next
311    /// [`registration`].
312    ///
313    /// # Safety
314    ///
315    /// * The thread must be registered in the [`SharedManager`].
316    /// * It is called only once for one [`registration`].
317    /// * After calling this function, the caller doesn't call [`local_manager`] before the next
318    ///   [`registration`].
319    ///
320    /// # Panics
321    ///
322    /// It the thread is not registered.
323    ///
324    /// [`registration`]: SharedManager::register_new_executor
325    pub unsafe fn deregister() {
326        struct DeregisterInNewEpochArgs {
327            epoch_at_start: usize,
328            storage: Storage,
329            shared_manager: SharedManager,
330        }
331
332        fn deregister_in_new_epoch(
333            mut args: DeregisterInNewEpochArgs
334        ) {
335            fn wait_new_epoch_and_clear(
336                shared_manager: &SharedManager,
337                mut storage: Storage,
338                epoch_at_start: usize
339            ) {
340                while shared_manager.current_epoch() == epoch_at_start {
341                    thread::sleep(Duration::from_millis(1));
342                }
343
344                storage.clear(shared_manager);
345            }
346
347            let is_new_epoch = args.shared_manager.deregister_executor();
348
349            if is_new_epoch {
350                debug_assert_ne!(args.epoch_at_start, args.shared_manager.current_epoch());
351
352                // The executor passed the current epoch and has been deregistered
353                args.storage.clear(&args.shared_manager);
354
355                return;
356            }
357
358            wait_new_epoch_and_clear(&args.shared_manager, args.storage, args.epoch_at_start);
359        }
360
361        let mut local_manager = LOCAL_MANAGER
362            .with(|local_manager_| unsafe {
363                (*local_manager_.get())
364                    .take()
365                    .expect("LocalManager is not registered in this thread")
366            });
367
368        let epoch_at_start = local_manager.current_epoch;
369        let mut full_storage = mem::replace(&mut local_manager.current_storage, Storage::new());
370
371        full_storage.append(&mut local_manager.prev_storage);
372
373        let mut args = DeregisterInNewEpochArgs {
374            epoch_at_start,
375            storage: full_storage,
376            shared_manager: local_manager
377                .shared_manager
378                .clone(),
379        };
380
381        // Maybe we still have not passed the current epoch
382        if !local_manager.was_passed_epoch {
383            deregister_in_new_epoch(args);
384
385            return;
386        }
387
388        while args.shared_manager.current_epoch() == args.epoch_at_start {
389            thread::sleep(Duration::from_millis(1));
390        }
391
392        args.epoch_at_start += 1;
393
394        deregister_in_new_epoch(args);
395    }
396
397    /// Deregisters the thread-local `LocalManager` without reclaiming memory.
398    ///
399    /// It is the unsafest function in the library.
400    /// It is expected to be called only before the thread is stopped
401    /// and to be __resumed__ by calling [`LocalManager::resume_after_temporary_deregister`]
402    /// after the thread is resumed.
403    ///
404    /// After this function is called and before calling
405    /// [`LocalManager::resume_after_temporary_deregister`], the thread-local `LocalManager`
406    /// is invalid.
407    ///
408    /// You should completely deregister the thread-local `LocalManager` by calling
409    /// [`LocalManager::deregister`] before the thread is terminated.
410    ///
411    /// # Safety
412    ///
413    /// * The thread must be registered in the [`SharedManager`].
414    /// * After calling this function, the [`LocalManager`] is not used before calling
415    ///   [`LocalManager::resume_after_temporary_deregister`].
416    /// * [`LocalManager::deregister`] must be called before the thread is terminated.
417    ///
418    /// # Example
419    ///
420    /// ```rust
421    /// use light_qsbr::{local_manager, LocalManager, SharedManager};
422    ///
423    /// let shared_manager = SharedManager::new();
424    ///
425    /// shared_manager.register_new_executor();
426    ///
427    /// // Do some work
428    ///
429    /// // While thread is stopped it can't call `local_manager.maybe_pass_epoch`.
430    /// // But completely deregister is excessive.
431    /// // While `local_manager().temporary_deregister()` is inexpensive
432    /// // and allows other threads to pass epochs.
433    ///
434    /// unsafe { local_manager().temporary_deregister(); }
435    ///
436    /// std::thread::sleep(std::time::Duration::from_secs(1));
437    ///
438    /// unsafe { local_manager().resume_after_temporary_deregister(); }
439    ///
440    /// // Do some work
441    ///
442    ///  unsafe { LocalManager::deregister(); }
443    /// ```
444    pub unsafe fn temporary_deregister(&mut self) {
445        self.was_stopped = true;
446
447        self.shared_manager().deregister_executor();
448    }
449
450    /// Resumes the thread-local `LocalManager` after calling [`LocalManager::temporary_deregister`].
451    ///
452    /// # Safety
453    ///
454    /// * The thread must be registered (before calling [`LocalManager::temporary_deregister`])
455    ///   in the [`SharedManager`].
456    /// * [`LocalManager::temporary_deregister`] must be called before calling this function.
457    ///
458    /// # Example
459    ///
460    /// You can find an example in [`LocalManager::temporary_deregister`].
461    pub unsafe fn resume_after_temporary_deregister(&mut self) {
462        self.was_stopped = false;
463
464        self.shared_manager().register_executor_again();
465    }
466}
467
468thread_local! {
469    /// A thread-local [`LocalManager`].
470    pub(crate) static LOCAL_MANAGER: UnsafeCell<Option<LocalManager>> = const { UnsafeCell::new(None) };
471}
472
473/// Returns a reference to the thread-local [`LocalManager`].
474///
475/// # Panics
476///
477/// It the thread is not registered.
478///
479/// # Undefined behavior
480///
481/// If the thread is not registered when `cfg(debug_assertions)` is disabled,
482/// it causes undefined behavior.
483pub fn local_manager() -> &'static mut LocalManager {
484    LOCAL_MANAGER
485        .with(|local_manager| unsafe {
486            unwrap_or_bug_message_hint(
487                (*local_manager.get()).as_mut(),
488                "Local manager is not registered in this thread."
489            )
490        })
491}
492
493#[cfg(test)]
494mod tests {
495    use std::sync::{Arc, Condvar, Mutex};
496    use super::*;
497
498    #[test]
499    fn test_temporary_deregister() {
500        let shared_manager = SharedManager::new();
501        let was_started = Arc::new((Mutex::new(false), Condvar::new()));
502        let was_started_clone = was_started.clone();
503        let was_passed = Arc::new((Mutex::new(false), Condvar::new()));
504        let was_passed_clone = was_passed.clone();
505
506        shared_manager.register_new_executor();
507
508        let handle = thread::spawn(move || {
509            shared_manager.register_new_executor();
510
511            *was_started.0.lock().unwrap() = true;
512
513            was_started.1.notify_one();
514
515            for _ in 0..10 {
516                local_manager().maybe_pass_epoch(OrengineInstant::now());
517
518                thread::sleep(Duration::from_millis(1));
519            }
520
521            *was_passed.0.lock().unwrap() = true;
522
523            was_passed.1.notify_one();
524
525            unsafe { LocalManager::deregister(); }
526        });
527
528        let mut started = was_started_clone.0.lock().unwrap();
529        while !*started {
530            started = was_started_clone.1.wait(started).unwrap();
531        }
532
533        drop(started);
534
535        unsafe { local_manager().temporary_deregister(); }
536
537        let mut passed = was_passed_clone.0.lock().unwrap();
538        let mut timeout_error;
539        while !*passed {
540            (passed, timeout_error) = was_passed_clone.1.wait_timeout(passed, Duration::from_secs(3)).unwrap();
541
542            assert!(!timeout_error.timed_out());
543        }
544
545        drop(passed);
546
547        unsafe {
548            local_manager().resume_after_temporary_deregister();
549            LocalManager::deregister();
550        }
551
552        handle.join().unwrap();
553    }
554}