use std::cell::{Cell, UnsafeCell};
use std::sync::atomic::{Ordering, AtomicBool};
use std::mem::ManuallyDrop;
use std::fmt::{Debug, Formatter};
use std::{fmt, mem};
use std::collections::HashSet;
use parking_lot::{Mutex, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard, Condvar, RwLock};
use slog::{Logger, FnValue, trace, Drain, o};
use super::{ShadowStack, ContextState};
use crate::{RawCollectorImpl, CollectorRef};
use crate::utils::ThreadId;
use crate::collector::SyncCollector;
pub struct CollectionManager<C: RawCollectorImpl> {
state: RwLock<CollectorState<C>>,
collecting: AtomicBool,
valid_contexts_wait: Condvar,
collection_wait: Condvar,
valid_contexts_lock: Mutex<()>,
collection_wait_lock: Mutex<()>,
}
impl<C: SyncCollector> super::sealed::Sealed for CollectionManager<C> {}
unsafe impl<C> super::CollectionManager<C> for CollectionManager<C>
where C: SyncCollector,
C: RawCollectorImpl<Manager=Self, RawContext=RawContext<C>> {
type Context = RawContext<C>;
fn new() -> Self {
assert!(C::SYNC);
CollectionManager {
state: RwLock::new(CollectorState::new()),
valid_contexts_wait: Condvar::new(),
collection_wait: Condvar::new(),
valid_contexts_lock: Mutex::new(()),
collection_wait_lock: Mutex::new(()),
collecting: AtomicBool::new(false),
}
}
#[inline]
fn is_collecting(&self) -> bool {
self.collecting.load(Ordering::SeqCst)
}
#[inline]
fn should_trigger_collection(&self) -> bool {
self.collecting.load(Ordering::Relaxed)
}
unsafe fn freeze_context(&self, context: &RawContext<C>) {
assert_eq!(context.state.get(), ContextState::Active);
context.state.set(ContextState::Frozen);
self.valid_contexts_wait.notify_all();
}
unsafe fn unfreeze_context(&self, context: &RawContext<C>) {
context.collector.as_raw().prevent_collection(|_| {
assert_eq!(context.state.get(), ContextState::Frozen);
context.state.set(ContextState::Active);
})
}
#[inline]
fn prevent_collection<R>(collector: &C, func: impl FnOnce() -> R) -> R {
collector.prevent_collection(|_state| func())
}
#[inline]
unsafe fn free_context(collector: &C, context: *mut Self::Context) {
collector.free_context(context)
}
}
pub struct RawContext<C: RawCollectorImpl> {
pub(crate) collector: CollectorRef<C>,
original_thread: ThreadId,
pub(super) shadow_stack: UnsafeCell<ShadowStack<C>>,
pub(super) state: Cell<ContextState>,
pub(super) logger: Logger
}
impl<C: RawCollectorImpl> Debug for RawContext<C> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("RawContext")
.field(
"collector",
&format_args!("{:p}", &self.collector)
)
.field(
"shadow_stacks",
unsafe { &*self.shadow_stack.get() }
)
.field("state", &self.state.get())
.finish()
}
}
impl<C: SyncCollector> super::sealed::Sealed for RawContext<C> {}
unsafe impl<C> super::RawContext<C> for RawContext<C>
where C: SyncCollector<RawContext=Self, Manager=CollectionManager<C>> {
unsafe fn register_new(collector: &CollectorRef<C>) -> ManuallyDrop<Box<Self>> {
let original_thread = if collector.as_raw().logger().is_trace_enabled() {
ThreadId::current()
} else {
ThreadId::Nop
};
let mut context = ManuallyDrop::new(Box::new(RawContext {
collector: collector.clone_internal(),
original_thread: original_thread.clone(),
logger: collector.as_raw().logger().new(o!(
"original_thread" => original_thread.clone()
)),
shadow_stack: UnsafeCell::new(ShadowStack {
last: std::ptr::null_mut(),
}),
state: Cell::new(ContextState::Active)
}));
let old_num_total = collector.as_raw().add_context(&mut **context);
trace!(
collector.as_raw().logger(), "Creating new context";
"ptr" => format_args!("{:p}", &**context),
"old_num_total" => old_num_total,
"current_thread" => &original_thread
);
context
}
#[cold]
#[inline(never)]
unsafe fn trigger_safepoint(&self) {
let collector = self.collector.as_raw();
let mut guard = collector.manager().state.write();
let state = &mut *guard;
if state.pending.is_none() {
assert_eq!(collector.manager().collecting.compare_exchange(
false, true,
Ordering::SeqCst,
Ordering::Relaxed,
), Ok(false));
let id = state.next_pending_id();
#[allow(clippy::mutable_key_type)]
let known_contexts = state.known_contexts.get_mut();
state.pending = Some(PendingCollection::new(
id,
known_contexts.iter()
.cloned()
.filter(|ctx| (**ctx).state.get().is_frozen())
.collect(),
known_contexts.len(),
));
trace!(
self.logger,
"Creating collector";
"id" => id,
"ctx_ptr" => format_args!("{:?}", self),
"initially_valid_contexts" => ?state.pending.as_ref()
.unwrap().valid_contexts,
"known_contexts" => FnValue(|_| {
#[allow(clippy::mutable_key_type)]
let mut map = std::collections::HashMap::new();
for &context in &*known_contexts {
map.insert(context, format!("{:?} @ {:?}: {:?}",
(*context).state.get(),
(*context).original_thread,
&*(*context).shadow_stack.get()
));
}
format!("{:?}", map)
})
);
}
let shadow_stack = &*self.shadow_stack.get();
let ptr = self as *const RawContext<C> as *mut RawContext<C>;
debug_assert!(state.known_contexts.get_mut().contains(&ptr));
let mut pending = state.pending.as_mut().unwrap();
assert_eq!(self.state.replace(ContextState::SafePoint {
collection_id: pending.id
}), ContextState::Active);
debug_assert_eq!(
state.known_contexts.get_mut().len(),
pending.total_contexts
);
pending.push_pending_context(ptr);
let expected_id = pending.id;
pending.waiting_contexts += 1;
trace!(
self.logger, "Awaiting collection";
"ptr" => ?ptr,
"current_thread" => FnValue(|_| ThreadId::current()),
"shadow_stack" => FnValue(|_| format!("{:?}", shadow_stack.as_vec())),
"total_contexts" => pending.total_contexts,
"waiting_contexts" => pending.waiting_contexts,
"state" => ?pending.state,
"collector_id" => expected_id
);
collector.await_collection(
expected_id, ptr, guard,
|state, contexts| {
let pending = state.pending.as_mut().unwrap();
trace!(
self.logger, "Beginning simple collection";
"current_thread" => FnValue(|_| ThreadId::current()),
"original_size" => %collector.allocated_size(),
"contexts" => ?contexts,
"total_contexts" => pending.total_contexts,
"state" => ?pending.state,
"collector_id" => pending.id,
);
collector.perform_raw_collection(&contexts);
assert_eq!(
pending.state,
PendingState::InProgress
);
pending.state = PendingState::Finished;
collector.acknowledge_finished_collection(
&mut state.pending, ptr
);
}
);
trace!(
self.logger, "Finished waiting for collection";
"current_thread" => FnValue(|_| ThreadId::current()),
"collector_id" => expected_id,
);
}
#[inline]
fn shadow_stack_ptr(&self) -> *mut ShadowStack<C> {
self.shadow_stack.get()
}
#[inline]
unsafe fn collector(&self) -> &C {
self.collector.as_raw()
}
#[inline]
fn state(&self) -> ContextState {
self.state.get()
}
}
pub struct CollectorState<C: RawCollectorImpl> {
pending: Option<PendingCollection<C>>,
known_contexts: Mutex<HashSet<*mut RawContext<C>>>,
next_pending_id: u64
}
impl<C: RawCollectorImpl> CollectorState<C> {
pub fn new() -> Self {
CollectorState {
pending: None,
known_contexts: Mutex::new(HashSet::new()),
next_pending_id: 0
}
}
fn next_pending_id(&mut self) -> u64 {
let id = self.next_pending_id;
self.next_pending_id = id.checked_add(1).expect("Overflow");
id
}
}
pub(crate) trait SyncCollectorImpl: RawCollectorImpl<Manager=CollectionManager<Self>> {
fn prevent_collection<R>(&self, func: impl FnOnce(&CollectorState<Self>) -> R) -> R {
let mut state = self.manager().state.read();
while state.pending.is_some() {
RwLockReadGuard::unlocked(&mut state, || {
let mut lock = self.manager().collection_wait_lock.lock();
self.manager().collection_wait.wait(&mut lock);
})
}
assert!(!self.manager().collecting.load(Ordering::SeqCst));
func(&*state)
}
unsafe fn add_context(&self, raw: *mut RawContext<Self>) -> usize {
self.prevent_collection(|state| {
let mut known_contexts = state.known_contexts.lock();
let old_num_known = known_contexts.len();
assert!(known_contexts.insert(raw), "Already inserted {:p}", raw);
old_num_known
})
}
unsafe fn free_context(&self, raw: *mut RawContext<Self>) {
trace!(
self.logger(), "Freeing context";
"ptr" => format_args!("{:p}", raw),
"state" => ?(*raw).state.get()
);
let ptr = raw;
let guard = self.manager().state.upgradable_read();
match &guard.pending {
None => {
assert!(guard.known_contexts.lock().remove(&ptr));
drop(guard);
},
Some(pending @ PendingCollection { state: PendingState::Finished, .. }) => {
assert!(!pending.valid_contexts.contains(&ptr));
assert!(guard.known_contexts.lock().remove(&ptr));
drop(guard);
},
Some(PendingCollection { state: PendingState::Waiting, .. }) => {
let mut guard = RwLockUpgradableReadGuard::upgrade(guard);
let pending = guard.pending.as_mut().unwrap();
assert_eq!(
pending.valid_contexts.iter()
.find(|&&ctx| std::ptr::eq(ctx, ptr)),
None, "state = {:?}", (*raw).state.get()
);
pending.total_contexts -= 1;
assert!(guard.known_contexts.get_mut().remove(&ptr));
drop(guard);
},
Some(PendingCollection { state: PendingState::InProgress, .. }) => {
unreachable!("cant free while collection is in progress")
},
}
drop(Box::from_raw(raw));
self.manager().valid_contexts_wait.notify_all();
}
unsafe fn await_collection(
&self,
expected_id: u64,
context: *mut RawContext<Self>,
mut lock: RwLockWriteGuard<CollectorState<Self>>,
perform_collection: impl FnOnce(
&mut CollectorState<Self>,
Vec<*mut RawContext<Self>>
)
) {
loop {
match &mut lock.pending {
Some(ref mut pending) => {
assert_eq!(expected_id, pending.id);
debug_assert!(self.manager().collecting.load(Ordering::SeqCst));
match pending.state {
PendingState::Finished => {
self.acknowledge_finished_collection(
&mut lock.pending, context
);
drop(lock);
return
},
PendingState::Waiting if pending.valid_contexts.len() ==
pending.total_contexts => {
assert_eq!(
std::mem::replace(
&mut pending.state,
PendingState::InProgress
),
PendingState::Waiting
);
let contexts = if cfg!(debug_assertions) {
pending.valid_contexts.clone()
} else {
mem::replace(
&mut pending.valid_contexts,
Vec::new()
)
};
perform_collection(&mut *lock, contexts);
drop(lock);
self.manager().collection_wait.notify_all();
return
}
PendingState::Waiting => {
RwLockWriteGuard::unlocked(&mut lock, || {
let mut lock = self.manager().valid_contexts_lock.lock();
self.manager().valid_contexts_wait.wait(&mut lock);
})
}
PendingState::InProgress => {
RwLockWriteGuard::unlocked(&mut lock, || {
let mut lock = self.manager().collection_wait_lock.lock();
self.manager().collection_wait.wait(&mut lock);
})
}
}
}
None => {
panic!(
"Unexpectedly finished collection: {}",
expected_id
)
}
}
}
}
unsafe fn acknowledge_finished_collection(
&self,
pending_ref: &mut Option<PendingCollection<Self>>,
context: *mut RawContext<Self>
) {
let waiting_contexts = {
let pending = pending_ref.as_mut().unwrap();
assert_eq!(pending.state, PendingState::Finished);
if cfg!(debug_assertions) {
match pending.valid_contexts.iter()
.position(|&ptr| std::ptr::eq(ptr, context)) {
Some(index) => {
pending.valid_contexts.remove(index);
},
None => panic!("Unable to find context: {:p}", context)
}
}
assert_eq!(
(*context).state.replace(ContextState::Active),
ContextState::SafePoint {
collection_id: pending.id
}
);
pending.waiting_contexts -= 1;
pending.waiting_contexts
};
if waiting_contexts == 0 {
*pending_ref = None;
assert_eq!(self.manager().collecting.compare_exchange(
true, false,
Ordering::SeqCst,
Ordering::Relaxed,
), Ok(true));
self.manager().collection_wait.notify_all();
}
}
}
impl<C> SyncCollectorImpl for C
where C: crate::collector::SyncCollector,
C: RawCollectorImpl<Manager=CollectionManager<Self>> {}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
enum PendingState {
Finished,
InProgress,
Waiting
}
#[derive(Debug)]
pub(crate) struct PendingCollection<C: RawCollectorImpl> {
state: PendingState,
total_contexts: usize,
waiting_contexts: usize,
valid_contexts: Vec<*mut RawContext<C>>,
id: u64
}
impl<C: RawCollectorImpl> PendingCollection<C> {
pub fn new(
id: u64,
valid_contexts: Vec<*mut RawContext<C>>,
total_contexts: usize,
) -> Self {
PendingCollection {
state: PendingState::Waiting,
total_contexts,
waiting_contexts: 0,
valid_contexts,
id
}
}
#[inline]
pub unsafe fn push_pending_context(
&mut self,
context: *mut RawContext<C>,
) {
debug_assert_ne!((*context).state.get(), ContextState::Active);
assert_eq!(self.state, PendingState::Waiting);
debug_assert!(!self.valid_contexts.contains(&context));
self.valid_contexts.push(context);
assert_eq!(self.state, PendingState::Waiting);
}
}