light_qsbr/
local_manager.rs

1/// This module provides the [`LocalManager`].
2use std::alloc::{Layout, dealloc};
3use std::cell::UnsafeCell;
4use std::mem::MaybeUninit;
5use std::time::Duration;
6use std::{mem, thread};
7use orengine_utils::clear_with;
8use orengine_utils::hints::{likely, unlikely, unwrap_or_bug_hint, unwrap_or_bug_message_hint};
9use orengine_utils::instant::OrengineInstant;
10use crate::deffered::Deferred;
11use crate::shared_manager::SharedManager;
12
13/// A storage of objects that need to be deallocated or dropped.
14struct Storage {
15    to_deallocate: Vec<(*mut u8, Layout)>,
16    to_drop: Vec<Deferred>,
17}
18
19impl Storage {
20    /// Creates a new empty storage.
21    const fn new() -> Self {
22        Self {
23            to_deallocate: Vec::new(),
24            to_drop: Vec::new(),
25        }
26    }
27
28    /// Clears the storage.
29    ///
30    /// When #[cfg(test)] is enabled, it also increments the number of bytes deallocated.
31    #[allow(unused_variables, reason = "It is used for tests")]
32    fn clear(&mut self, shared_manager: &SharedManager) {
33        clear_with(&mut self.to_deallocate, |(ptr, layout)| unsafe {
34            #[cfg(test)]
35            {
36                shared_manager.increment_bytes_deallocated(layout.size());
37            }
38
39            dealloc(ptr, layout);
40        });
41
42        clear_with(&mut self.to_drop, |f| {
43            f.call();
44        });
45    }
46
47    /// Transfers all objects from `other` to `self`.
48    fn append(&mut self, other: &mut Self) {
49        self.to_deallocate.append(&mut other.to_deallocate);
50        self.to_drop.append(&mut other.to_drop);
51    }
52}
53
54#[allow(
55    clippy::non_send_fields_in_send_ty,
56    reason = "We guarantee that it is `Send`"
57)]
58unsafe impl Send for Storage {}
59unsafe impl Sync for Storage {}
60
61/// A local manager of objects that need to be deallocated or dropped when it is safe.
62///
63/// It should be created by some runtime with [`SharedManager::register_new_executor`], and only
64/// the runtime can call [`LocalManager::deregister`] and [`LocalManager::maybe_pass_epoch`].
65/// The runtime should call [`LocalManager::deregister`] when it is stopped and should call
66/// [`LocalManager::maybe_pass_epoch`] periodically (it uses cached time to prevent unnecessary
67/// attempts to pass the epoch).
68///
69/// After the registration, the `LocalManager` is stored in thread-local storage and can be
70/// accessed with [`local_manager`].
71///
72/// It allows scheduling objects to be deallocated or dropped when it is safe.
73///
74/// Use [`LocalManager::schedule_deallocate`] and [`LocalManager::schedule_deallocate_slice`]
75/// to schedule deallocation of objects, and [`LocalManager::schedule_drop`] to schedule dropping
76/// of objects.
77///
78/// # Example
79///
80/// ```rust
81/// use std::cell::Cell;
82/// use light_qsbr::{local_manager, SharedManager, orengine_utils::instant::OrengineInstant, LocalManager};
83///
84/// # struct Runtime { tasks: Vec<Box<dyn FnOnce() + Send + 'static>>, is_stopped: Cell<bool> }
85/// # struct LockFreeStack<T> { ptr: *mut T }
86/// # struct LockFreeStackNode<T> { ptr: *mut T }
87/// # impl<T> LockFreeStackNode<T> {
88/// #     fn get_node_ptr(&self) -> *mut LockFreeStackNode<T> {
89/// #         unreachable!()
90/// #     }
91/// # }
92/// # impl<T> LockFreeStack<T> {
93/// #     fn pop(&self) -> LockFreeStackNode<T> { LockFreeStackNode::<T> { ptr: unreachable!() } }
94/// # }
95/// #
96/// fn start_runtime() {
97///     let mut runtime = Runtime { tasks: Vec::new(), is_stopped: Cell::new(false) };
98///     let shared_manager = SharedManager::new();
99///
100///     shared_manager.register_new_executor();
101///
102///     'runtime: loop {
103///         for _ in 0..61 {
104///             if let Some(task) = runtime.tasks.pop() {
105///                 task();
106///             } else {
107///                 break 'runtime;
108///             }
109///         }
110///
111///         if runtime.is_stopped.get() {
112///             break;
113///         }
114///
115///         local_manager().maybe_pass_epoch(OrengineInstant::now()); // Free some memory if it is safe
116///     }
117///
118///     unsafe { LocalManager::deregister() };
119/// }
120///
121/// fn one_of_tasks(lock_free_stack: LockFreeStack<usize>) {
122///     let node = lock_free_stack.pop();
123///     let ptr_to_deallocate: *mut LockFreeStackNode<usize> = node.get_node_ptr();
124///     // It's not safe to release the node right now because other threads can load it but still not read it.
125///     unsafe {
126///         local_manager() // Get the thread-local LocalManager
127///             .schedule_deallocate(ptr_to_deallocate);
128///     }
129///     // The node will be deallocated when it is safe.
130/// }
131/// ```
132pub struct LocalManager {
133    current_epoch: usize,
134    this_epoch_start: OrengineInstant,
135    was_passed_epoch: bool,
136    shared_manager: SharedManager,
137
138    prev_storage: Storage,
139    current_storage: Storage,
140}
141
142impl LocalManager {
143    /// Creates a new `LocalManager`.
144    pub(crate) fn new(shared_manager: &SharedManager) -> Self {
145        Self {
146            current_epoch: shared_manager.current_epoch(),
147            this_epoch_start: unsafe { MaybeUninit::zeroed().assume_init() },
148            was_passed_epoch: false,
149            shared_manager: shared_manager.clone(),
150
151            prev_storage: Storage::new(),
152            current_storage: Storage::new(),
153        }
154    }
155
156    /// Returns the number of bytes deallocated since creation of the `SharedManager`.
157    #[cfg(test)]
158    pub(crate) fn bytes_deallocated(&self) -> usize {
159        self.shared_manager.bytes_deallocated()
160    }
161
162    /// Returns the current epoch.
163    pub fn current_epoch(&self) -> usize {
164        self.current_epoch
165    }
166
167    /// Returns a reference to the associated [`SharedManager`].
168    pub fn shared_manager(&self) -> &SharedManager {
169        &self.shared_manager
170    }
171
172    /// Schedules deallocation of an object. It will be deallocated when it is safe.
173    ///
174    /// # Safety
175    ///
176    /// It requires the same safety conditions as [`dealloc`].
177    pub unsafe fn schedule_deallocate<T>(&mut self, ptr: *const T) {
178        self.current_storage
179            .to_deallocate
180            .push((ptr.cast::<u8>().cast_mut(), Layout::new::<T>()));
181    }
182
183    /// Schedules deallocation of the provided slice.
184    /// They will be deallocated when it is safe.
185    ///
186    /// # Safety
187    ///
188    /// It requires the same safety conditions as [`dealloc`].
189    ///
190    /// # Panics
191    ///
192    ///
193    pub unsafe fn schedule_deallocate_slice<T>(&mut self, ptr: *const T, len: usize) {
194        self.current_storage.to_deallocate.push((
195            ptr.cast::<u8>().cast_mut(),
196            unwrap_or_bug_hint(Layout::array::<T>(len)),
197        ));
198    }
199
200    /// Schedules dropping of the provided function.
201    /// It will be dropped when it is safe.
202    ///
203    /// The function can be a closure; therefore, it can be used to drop any object.
204    ///
205    /// # Safety
206    ///
207    /// It requires the same safety conditions as [`mem::ManuallyDrop::drop`].
208    pub unsafe fn schedule_drop<F: FnOnce()>(&mut self, func: F) {
209        self.current_storage.to_drop.push(Deferred::new(func));
210    }
211
212    /// Collects garbage or the previous epoch's storage.
213    pub(crate) fn collect_garbage(&mut self) {
214        self.prev_storage.clear(&self.shared_manager);
215    }
216
217    /// Reacts to the epoch change.
218    fn react_to_epoch_change(&mut self, global_epoch: usize, now: OrengineInstant) {
219        debug_assert_eq!(global_epoch, self.current_epoch + 1);
220
221        self.current_epoch = global_epoch;
222        self.this_epoch_start = now;
223        self.was_passed_epoch = false;
224
225        self.collect_garbage();
226    }
227
228    /// Maybe passes the epoch and frees some memory if it is safe.
229    ///
230    /// This function accepts the current time as a parameter to avoid
231    /// very often epoch passing.
232    ///
233    /// While at least one thread doesn't pass the epoch,
234    /// all other threads can't free memory.
235    pub fn maybe_pass_epoch(&mut self, now: OrengineInstant) {
236        #[cfg(not(test))]
237        const EXPECTED_EPOCH_DURATION: Duration = Duration::from_millis(10);
238
239        #[cfg(test)]
240        const EXPECTED_EPOCH_DURATION: Duration = Duration::from_micros(100);
241
242        if likely(now - self.this_epoch_start < EXPECTED_EPOCH_DURATION) {
243            return;
244        }
245
246        let global_epoch = self.shared_manager.current_epoch();
247
248        if unlikely(self.current_epoch < global_epoch) {
249            debug_assert!(self.was_passed_epoch);
250
251            self.react_to_epoch_change(global_epoch, now);
252
253            return;
254        }
255
256        debug_assert_eq!(self.current_epoch, global_epoch);
257
258        if likely(self.was_passed_epoch) {
259            return;
260        }
261
262        self.was_passed_epoch = true;
263
264        debug_assert!(
265            self.prev_storage.to_drop.is_empty() && self.prev_storage.to_deallocate.is_empty()
266        );
267        mem::swap(&mut self.prev_storage, &mut self.current_storage);
268
269        let was_changed = self.shared_manager.executor_passed_epoch();
270        if unlikely(was_changed) {
271            self.react_to_epoch_change(global_epoch + 1, now);
272        }
273    }
274
275    /// Deregisters the thread-local `LocalManager`.
276    /// After that the calling [`local_manager`] can cause undefined behavior before the next
277    /// [`registration`].
278    ///
279    /// # Safety
280    ///
281    /// * The thread must be registered in the [`SharedManager`].
282    /// * It is called only once for one [`registration`].
283    /// * After calling this function, the caller doesn't call [`local_manager`] before the next
284    ///   [`registration`].
285    ///
286    /// # Panics
287    ///
288    /// It the thread is not registered.
289    ///
290    /// [`registration`]: SharedManager::register_new_executor
291    pub unsafe fn deregister() {
292        struct DeregisterInNewEpochArgs {
293            epoch_at_start: usize,
294            storage: Storage,
295            shared_manager: SharedManager,
296        }
297
298        fn deregister_in_new_epoch(
299            mut args: DeregisterInNewEpochArgs
300        ) {
301            fn wait_new_epoch_and_clear(
302                shared_manager: &SharedManager,
303                mut storage: Storage,
304                epoch_at_start: usize
305            ) {
306                while shared_manager.current_epoch() == epoch_at_start {
307                    thread::sleep(Duration::from_millis(1));
308                }
309
310                storage.clear(shared_manager);
311            }
312
313            let is_new_epoch = args.shared_manager.deregister_executor();
314
315            if is_new_epoch {
316                debug_assert_ne!(args.epoch_at_start, args.shared_manager.current_epoch());
317
318                // The executor passed the current epoch and has been deregistered
319                args.storage.clear(&args.shared_manager);
320
321                return;
322            }
323
324            wait_new_epoch_and_clear(&args.shared_manager, args.storage, args.epoch_at_start);
325        }
326
327        let mut local_manager = LOCAL_MANAGER
328            .with(|local_manager_| unsafe {
329                (*local_manager_.get())
330                    .take()
331                    .expect("LocalManager is not registered in this thread")
332            });
333
334        let epoch_at_start = local_manager.current_epoch;
335        let mut full_storage = mem::replace(&mut local_manager.current_storage, Storage::new());
336
337        full_storage.append(&mut local_manager.prev_storage);
338
339        let mut args = DeregisterInNewEpochArgs {
340            epoch_at_start,
341            storage: full_storage,
342            shared_manager: local_manager
343                .shared_manager
344                .clone(),
345        };
346
347        // Maybe we still have not passed the current epoch
348        if !local_manager.was_passed_epoch {
349            deregister_in_new_epoch(args);
350
351            return;
352        }
353
354        while args.shared_manager.current_epoch() == args.epoch_at_start {
355            thread::sleep(Duration::from_millis(1));
356        }
357
358        args.epoch_at_start += 1;
359
360        deregister_in_new_epoch(args);
361    }
362}
363
364thread_local! {
365    /// A thread-local [`LocalManager`].
366    pub(crate) static LOCAL_MANAGER: UnsafeCell<Option<LocalManager>> = const { UnsafeCell::new(None) };
367}
368
369/// Returns a reference to the thread-local [`LocalManager`].
370///
371/// # Panics
372///
373/// It the thread is not registered.
374///
375/// # Undefined behavior
376///
377/// If the thread is not registered when `cfg(debug_assertions)` is disabled,
378/// it causes undefined behavior.
379pub fn local_manager() -> &'static mut LocalManager {
380    LOCAL_MANAGER
381        .with(|local_manager| unsafe {
382            unwrap_or_bug_message_hint(
383                (*local_manager.get()).as_mut(),
384                "Local manager is not registered in this thread."
385            )
386        })
387}