use std::alloc::{Layout, dealloc};
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::time::Duration;
use std::{mem, thread};
use orengine_utils::clear_with;
use orengine_utils::hints::{likely, unlikely, unwrap_or_bug_hint, unwrap_or_bug_message_hint};
use orengine_utils::OrengineInstant;
use crate::deffered::Deferred;
use crate::shared_manager::SharedManager;
struct Storage {
to_deallocate: Vec<(*mut u8, Layout)>,
to_drop: Vec<Deferred>,
}
impl Storage {
const fn new() -> Self {
Self {
to_deallocate: Vec::new(),
to_drop: Vec::new(),
}
}
#[allow(unused_variables, reason = "It is used for tests")]
fn clear(&mut self, shared_manager: &SharedManager) {
clear_with(&mut self.to_deallocate, |(ptr, layout)| unsafe {
#[cfg(test)]
{
shared_manager.increment_bytes_deallocated(layout.size());
}
dealloc(ptr, layout);
});
clear_with(&mut self.to_drop, |f| {
f.call();
});
}
fn append(&mut self, other: &mut Self) {
self.to_deallocate.append(&mut other.to_deallocate);
self.to_drop.append(&mut other.to_drop);
}
}
#[allow(
clippy::non_send_fields_in_send_ty,
reason = "We guarantee that it is `Send`"
)]
unsafe impl Send for Storage {}
unsafe impl Sync for Storage {}
pub struct LocalManager {
current_epoch: usize,
this_epoch_start: OrengineInstant,
was_passed_epoch: bool,
shared_manager: SharedManager,
was_stopped: bool,
prev_storage: Storage,
current_storage: Storage,
}
impl LocalManager {
pub(crate) fn new(shared_manager: &SharedManager) -> Self {
Self {
current_epoch: shared_manager.current_epoch(),
this_epoch_start: unsafe { MaybeUninit::zeroed().assume_init() },
was_passed_epoch: false,
shared_manager: shared_manager.clone(),
was_stopped: false,
prev_storage: Storage::new(),
current_storage: Storage::new(),
}
}
#[cold]
#[inline(never)]
fn handle_stopped(&self) {
assert!(self.was_stopped);
panic!(
"`LocalManager` is used after calling `LocalManager::temporary_deregister` \
and before calling `LocalManager::resume_after_temporary_deregister`."
)
}
#[cfg(test)]
pub(crate) fn bytes_deallocated(&self) -> usize {
self.shared_manager.bytes_deallocated()
}
pub fn current_epoch(&self) -> usize {
self.current_epoch
}
pub fn shared_manager(&self) -> &SharedManager {
&self.shared_manager
}
pub unsafe fn schedule_deallocate<T>(&mut self, ptr: *const T) {
if unlikely(self.was_stopped) {
self.handle_stopped();
}
self.current_storage
.to_deallocate
.push((ptr.cast::<u8>().cast_mut(), Layout::new::<T>()));
}
pub unsafe fn schedule_deallocate_slice<T>(&mut self, ptr: *const T, len: usize) {
if unlikely(self.was_stopped) {
self.handle_stopped();
}
self.current_storage.to_deallocate.push((
ptr.cast::<u8>().cast_mut(),
unwrap_or_bug_hint(Layout::array::<T>(len)),
));
}
pub unsafe fn schedule_drop<F: FnOnce()>(&mut self, func: F) {
if unlikely(self.was_stopped) {
self.handle_stopped();
}
self.current_storage.to_drop.push(Deferred::new(func));
}
pub(crate) fn collect_garbage(&mut self) {
self.prev_storage.clear(&self.shared_manager);
}
fn react_to_epoch_change(&mut self, global_epoch: usize, now: OrengineInstant) {
debug_assert_eq!(global_epoch, self.current_epoch + 1);
self.current_epoch = global_epoch;
self.this_epoch_start = now;
self.was_passed_epoch = false;
self.collect_garbage();
}
pub fn maybe_pass_epoch(&mut self, now: impl Into<OrengineInstant>) {
#[cfg(not(test))]
const EXPECTED_EPOCH_DURATION: Duration = Duration::from_millis(10);
#[cfg(test)]
const EXPECTED_EPOCH_DURATION: Duration = Duration::from_micros(100);
let now = now.into();
if unlikely(self.was_stopped) {
self.handle_stopped();
}
if likely(now - self.this_epoch_start < EXPECTED_EPOCH_DURATION) {
return;
}
let global_epoch = self.shared_manager.current_epoch();
if unlikely(self.current_epoch < global_epoch) {
debug_assert!(self.was_passed_epoch);
self.react_to_epoch_change(global_epoch, now);
return;
}
debug_assert_eq!(self.current_epoch, global_epoch);
if likely(self.was_passed_epoch) {
return;
}
self.was_passed_epoch = true;
debug_assert!(
self.prev_storage.to_drop.is_empty() && self.prev_storage.to_deallocate.is_empty()
);
mem::swap(&mut self.prev_storage, &mut self.current_storage);
let was_changed = self.shared_manager.executor_passed_epoch();
if unlikely(was_changed) {
self.react_to_epoch_change(global_epoch + 1, now);
}
}
pub unsafe fn deregister() {
struct DeregisterInNewEpochArgs {
epoch_at_start: usize,
storage: Storage,
shared_manager: SharedManager,
}
fn deregister_in_new_epoch(
mut args: DeregisterInNewEpochArgs
) {
fn wait_new_epoch_and_clear(
shared_manager: &SharedManager,
mut storage: Storage,
epoch_at_start: usize
) {
while shared_manager.current_epoch() == epoch_at_start {
thread::sleep(Duration::from_millis(1));
}
storage.clear(shared_manager);
}
let is_new_epoch = args.shared_manager.deregister_executor();
if is_new_epoch {
debug_assert_ne!(args.epoch_at_start, args.shared_manager.current_epoch());
args.storage.clear(&args.shared_manager);
return;
}
wait_new_epoch_and_clear(&args.shared_manager, args.storage, args.epoch_at_start);
}
let mut local_manager = LOCAL_MANAGER
.with(|local_manager_| unsafe {
(*local_manager_.get())
.take()
.expect("LocalManager is not registered in this thread")
});
let epoch_at_start = local_manager.current_epoch;
let mut full_storage = mem::replace(&mut local_manager.current_storage, Storage::new());
full_storage.append(&mut local_manager.prev_storage);
let mut args = DeregisterInNewEpochArgs {
epoch_at_start,
storage: full_storage,
shared_manager: local_manager
.shared_manager
.clone(),
};
if !local_manager.was_passed_epoch {
deregister_in_new_epoch(args);
return;
}
while args.shared_manager.current_epoch() == args.epoch_at_start {
thread::sleep(Duration::from_millis(1));
}
args.epoch_at_start += 1;
deregister_in_new_epoch(args);
}
pub unsafe fn temporary_deregister(&mut self) {
self.was_stopped = true;
self.shared_manager().deregister_executor();
}
pub unsafe fn resume_after_temporary_deregister(&mut self) {
self.was_stopped = false;
self.shared_manager().register_executor_again();
}
}
thread_local! {
pub(crate) static LOCAL_MANAGER: UnsafeCell<Option<LocalManager>> = const { UnsafeCell::new(None) };
}
pub fn local_manager() -> &'static mut LocalManager {
LOCAL_MANAGER
.with(|local_manager| unsafe {
unwrap_or_bug_message_hint(
(*local_manager.get()).as_mut(),
"Local manager is not registered in this thread."
)
})
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Condvar, Mutex};
use super::*;
#[test]
fn test_temporary_deregister() {
let shared_manager = SharedManager::new();
let was_started = Arc::new((Mutex::new(false), Condvar::new()));
let was_started_clone = was_started.clone();
let was_passed = Arc::new((Mutex::new(false), Condvar::new()));
let was_passed_clone = was_passed.clone();
shared_manager.register_new_executor();
let handle = thread::spawn(move || {
shared_manager.register_new_executor();
*was_started.0.lock().unwrap() = true;
was_started.1.notify_one();
for _ in 0..10 {
local_manager().maybe_pass_epoch(OrengineInstant::now());
thread::sleep(Duration::from_millis(1));
}
*was_passed.0.lock().unwrap() = true;
was_passed.1.notify_one();
unsafe { LocalManager::deregister(); }
});
let mut started = was_started_clone.0.lock().unwrap();
while !*started {
started = was_started_clone.1.wait(started).unwrap();
}
drop(started);
unsafe { local_manager().temporary_deregister(); }
let mut passed = was_passed_clone.0.lock().unwrap();
let mut timeout_error;
while !*passed {
(passed, timeout_error) = was_passed_clone.1.wait_timeout(passed, Duration::from_secs(3)).unwrap();
assert!(!timeout_error.timed_out());
}
drop(passed);
unsafe {
local_manager().resume_after_temporary_deregister();
LocalManager::deregister();
}
handle.join().unwrap();
}
}