#![doc = include_str!("../README.md")]
use core::slice;
use std::alloc::{alloc_zeroed, Layout};
use std::any::{Any, TypeId};
use std::cell::{Cell, OnceCell, RefCell, UnsafeCell};
use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, LinkedList, VecDeque};
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::num::{
NonZeroI128, NonZeroI16, NonZeroI32, NonZeroI64, NonZeroI8, NonZeroIsize, NonZeroU128,
NonZeroU16, NonZeroU32, NonZeroU64, NonZeroU8, NonZeroUsize,
};
use std::ops::Deref;
use std::rc::Rc;
use std::sync::atomic::{
self, AtomicBool, AtomicI16, AtomicI32, AtomicI64, AtomicI8, AtomicIsize, AtomicU16, AtomicU32,
AtomicU64, AtomicU8, AtomicUsize, Ordering,
};
use std::sync::{Arc, OnceLock, Weak};
use std::time::{Duration, Instant};
use std::{array, thread};
use crossbeam_utils::sync::{Parker, Unparker};
use flume::{Receiver, RecvError, RecvTimeoutError, Sender};
use intentional::{Assert, Cast};
use kempt::map::Field;
use kempt::{Map, Set};
use parking_lot::{Condvar, Mutex, RwLock};
pub use refuse_macros::{MapAs, Trace};
pub mod architecture {}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd)]
struct CollectorThreadId(u32);
impl CollectorThreadId {
fn unique() -> Self {
static NEXT_ID: AtomicU32 = AtomicU32::new(0);
Self(NEXT_ID.fetch_add(1, Ordering::Release))
}
}
enum CollectorCommand {
NewThread(CollectorThreadId, Arc<UnsafeBins>),
ThreadShutdown(CollectorThreadId),
Collect(Instant),
ScheduleCollect,
}
impl CollectorCommand {
fn send(self) {
GlobalCollector::get()
.sender
.send(self)
.expect("collector not running");
}
fn schedule_collect_if_needed() {
let collector = GlobalCollector::get();
if collector
.info
.signalled_collector
.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed)
.is_ok()
{
collector
.sender
.send(Self::ScheduleCollect)
.expect("collector not running");
}
}
}
type AllThreadBins = Arc<RwLock<Map<CollectorThreadId, Weak<UnsafeBins>>>>;
struct ThreadBins {
alive: bool,
bins: Arc<UnsafeBins>,
}
struct CollectorInfo {
info: Mutex<CollectorInfoData>,
collection_sync: Condvar,
collector_unparker: Unparker,
signalled_collector: AtomicBool,
reader_state: ReaderState,
all_threads: AllThreadBins,
type_indexes: RwLock<Map<TypeId, TypeIndex>>,
}
impl CollectorInfo {
fn wait_for_collection(&self, requested_at: Instant) {
let mut info = self.info.lock();
while info.last_run < requested_at {
self.collection_sync.wait(&mut info);
}
}
}
struct CollectorInfoData {
last_run: Instant,
}
struct CollectorThreadChannels {
tracer: Sender<TraceRequest>,
}
struct TraceRequest {
thread: CollectorThreadId,
bins: Arc<dyn AnyBin>,
mark_one_sender: Arc<Sender<MarkRequest>>,
}
struct MarkRequest {
thread: CollectorThreadId,
type_index: TypeIndex,
slot_generation: u32,
bin_id: BinId,
mark_one_sender: Arc<Sender<MarkRequest>>,
}
struct Collector {
shared: Arc<CollectorInfo>,
receiver: Receiver<CollectorCommand>,
thread_bins: Map<CollectorThreadId, ThreadBins>,
active_threads: usize,
mark_bits: u8,
next_gc: Option<Instant>,
pause_failures: u8,
average_collection: Duration,
average_collection_locking: Duration,
}
impl Collector {
fn new(receiver: Receiver<CollectorCommand>, shared: Arc<CollectorInfo>) -> Self {
Self {
shared,
receiver,
thread_bins: Map::new(),
active_threads: 0,
mark_bits: 0,
next_gc: None,
pause_failures: 0,
average_collection: Duration::from_millis(1),
average_collection_locking: Duration::from_millis(1),
}
}
fn next_command(&self) -> Result<Option<CollectorCommand>, RecvError> {
if let Some(next_gc) = self.next_gc {
match self.receiver.recv_deadline(next_gc) {
Ok(value) => Ok(Some(value)),
Err(RecvTimeoutError::Timeout) => Ok(None),
Err(RecvTimeoutError::Disconnected) => Err(RecvError::Disconnected),
}
} else {
self.receiver.recv().map(Some)
}
}
fn schedule_gc(&mut self, target: Instant) {
if self.next_gc.map_or(true, |next_gc| target < next_gc) {
self.next_gc = Some(target);
}
}
fn run(mut self, parker: &Parker) {
thread::scope(|scope| {
let (tracer, trace_receiver) = flume::bounded(128);
let channels = CollectorThreadChannels { tracer };
let available_parallelism = thread::available_parallelism()
.ok()
.map_or(1, NonZeroUsize::get);
let trace_threads = 1.max(available_parallelism - 1);
for _ in 0..trace_threads {
scope.spawn({
let trace_receiver = trace_receiver.clone();
move || {
while let Ok(request) = trace_receiver.recv() {
request
.bins
.trace(&mut Tracer::new(request.thread, &request.mark_one_sender));
}
}
});
}
loop {
let command = match self.next_command() {
Ok(Some(command)) => command,
Ok(None) => {
self.collect_and_notify(&channels, parker);
continue;
}
Err(_) => break,
};
match command {
CollectorCommand::NewThread(id, bins) => {
let new_thread = self
.thread_bins
.insert(id, ThreadBins { alive: true, bins })
.is_none();
assert!(new_thread);
self.active_threads += 1;
}
CollectorCommand::ThreadShutdown(id) => {
self.thread_bins.get_mut(&id).expect("unknown thread").alive = false;
self.active_threads -= 1;
if self.active_threads == 0 {
self.thread_bins.clear();
}
}
CollectorCommand::Collect(requested_at) => {
let info = self.shared.info.lock();
if info.last_run < requested_at {
drop(info);
self.collect_and_notify(&channels, parker);
}
}
CollectorCommand::ScheduleCollect => {
self.schedule_gc(Instant::now() + self.average_collection * 5);
}
}
}
drop(channels);
});
}
fn collect_and_notify(&mut self, channels: &CollectorThreadChannels, parker: &Parker) {
self.next_gc = None;
let gc_start = Instant::now();
let collect_result = self.collect(channels, parker);
let gc_finish = Instant::now();
let gc_pause = match collect_result {
CollectResult::Ok => {
let elapsed = gc_finish - gc_start;
self.average_collection = (elapsed + self.average_collection * 2) / 3;
if self.thread_bins.is_empty() {
None
} else {
Some((self.average_collection * 100).max(Duration::from_millis(100)))
}
}
CollectResult::CouldntRun => {
self.pause_failures += 1;
Some(self.average_collection)
}
};
if let Some(pause) = gc_pause {
self.schedule_gc(gc_finish + pause);
}
let mut info = self.shared.info.lock();
info.last_run = gc_finish;
drop(info);
self.shared.reader_state.release_write();
self.shared.collection_sync.notify_all();
self.shared
.signalled_collector
.store(false, Ordering::Relaxed);
}
fn acquire_all_locks<'a>(
thread_bins: &'a Map<CollectorThreadId, ThreadBins>,
start: Instant,
average_collection_locking: Duration,
pause_failures: u8,
collector: &CollectorInfo,
parker: &Parker,
) -> Option<Map<CollectorThreadId, &'a mut Bins>> {
let force_gc = pause_failures >= 2;
let lock_wait = average_collection_locking * u32::from(pause_failures + 1) * 3;
let long_lock_deadline = start + lock_wait;
if !collector.reader_state.write() {
while collector.reader_state.readers() > 0 {
if force_gc {
parker.park();
} else {
parker.park_deadline(long_lock_deadline);
if Instant::now() > long_lock_deadline && collector.reader_state.readers() > 0 {
collector.reader_state.release_write();
collector.collection_sync.notify_all();
return None;
}
}
}
}
Some(
thread_bins
.iter()
.map(|entry| (*entry.key(), unsafe { entry.value.bins.assume_mut() }))
.collect(),
)
}
fn collect(&mut self, threads: &CollectorThreadChannels, parker: &Parker) -> CollectResult {
self.mark_bits = self.mark_bits.wrapping_add(1);
if self.mark_bits == 0 {
self.mark_bits = 1;
}
let start = Instant::now();
let Some(mut all_bins) = Self::acquire_all_locks(
&self.thread_bins,
start,
self.average_collection_locking,
self.pause_failures,
&self.shared,
parker,
) else {
self.pause_failures += 1;
return CollectResult::CouldntRun;
};
let locking_time = start.elapsed();
self.average_collection_locking = (locking_time + self.average_collection_locking * 2) / 3;
self.pause_failures = 0;
let (mark_one_sender, mark_ones) = flume::bounded(1024);
let mark_one_sender = Arc::new(mark_one_sender);
for (id, bins) in &mut all_bins {
let by_type = bins.by_type.read();
for i in 0..by_type.len() {
threads
.tracer
.send(TraceRequest {
thread: *id,
bins: by_type.field(i).expect("length checked").value.clone(),
mark_one_sender: mark_one_sender.clone(),
})
.expect("tracer stopped");
}
}
loop {
let MarkRequest {
thread,
type_index,
slot_generation,
bin_id,
mark_one_sender,
} = {
if Arc::strong_count(&mark_one_sender) == 1 {
match mark_ones.try_recv().ok() {
Some(msg) => msg,
None => break,
}
} else {
match mark_ones.recv_timeout(Duration::from_micros(1)) {
Ok(msg) => msg,
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout) => continue,
}
}
};
let bins = all_bins[&thread]
.by_type
.read()
.get(&type_index)
.expect("areas are never deallocated")
.clone();
if bins.mark_one(self.mark_bits, slot_generation, bin_id) {
bins.trace_one(
slot_generation,
bin_id,
&mut Tracer::new(thread, &mark_one_sender),
);
}
}
atomic::fence(Ordering::Acquire);
let mut threads_to_remove = Vec::new();
for (thread_id, bins) in all_bins.into_iter().map(Field::into_parts) {
let mut live_objects = 0_usize;
for bin in bins.by_type.write().values_mut() {
live_objects = live_objects.saturating_add(bin.sweep(self.mark_bits));
}
if live_objects == 0 {
threads_to_remove.push(thread_id);
}
}
if !threads_to_remove.is_empty() {
let mut all_threads = self.shared.all_threads.write();
for thread_id in threads_to_remove {
if !self.thread_bins[&thread_id].alive {
self.thread_bins.remove(&thread_id);
all_threads.remove(&thread_id);
}
}
}
CollectResult::Ok
}
}
enum CollectResult {
Ok,
CouldntRun,
}
struct GlobalCollector {
sender: Sender<CollectorCommand>,
info: Arc<CollectorInfo>,
}
impl GlobalCollector {
fn get() -> &'static GlobalCollector {
COLLECTOR.get_or_init(|| {
let (sender, receiver) = flume::bounded(1024);
let parker = Parker::new();
let info = Arc::new(CollectorInfo {
info: Mutex::new(CollectorInfoData {
last_run: Instant::now(),
}),
collection_sync: Condvar::new(),
collector_unparker: parker.unparker().clone(),
signalled_collector: AtomicBool::new(false),
reader_state: ReaderState(AtomicUsize::new(0)),
all_threads: AllThreadBins::default(),
type_indexes: RwLock::default(),
});
thread::Builder::new()
.name(String::from("collector"))
.spawn({
let info = info.clone();
move || Collector::new(receiver, info).run(&parker)
})
.expect("error starting collector thread");
GlobalCollector { sender, info }
})
}
}
static COLLECTOR: OnceLock<GlobalCollector> = OnceLock::new();
thread_local! {
static THREAD_POOL: RefCell<OnceCell<ThreadPool>> = const { RefCell::new(OnceCell::new()) };
}
#[derive(Default)]
struct UnsafeBins(UnsafeCell<ManuallyDrop<Bins>>);
impl UnsafeBins {
unsafe fn assume_readable(&self) -> &Bins {
&*self.0.get()
}
#[allow(clippy::mut_from_ref)]
unsafe fn assume_mut(&self) -> &mut Bins {
&mut *self.0.get()
}
}
unsafe impl Send for UnsafeBins {}
unsafe impl Sync for UnsafeBins {}
impl Drop for UnsafeBins {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(self.0.get_mut());
}
}
}
#[derive(Default)]
struct ThreadPool {
pool: LocalPool,
guard_depth: Rc<Cell<usize>>,
}
impl ThreadPool {
fn map_current<R>(map: impl FnOnce(&Self) -> R) -> R {
THREAD_POOL.with_borrow(|tp| map(tp.get_or_init(ThreadPool::default)))
}
fn get() -> Self {
Self::map_current(Self::clone)
}
fn current_depth() -> usize {
Self::map_current(|tp| tp.guard_depth.get())
}
fn push_thread_guard() -> usize {
Self::map_current(Self::push_guard)
}
fn release_thread_guard() {
Self::map_current(Self::release_guard);
}
fn push_guard(&self) -> usize {
let depth = self.guard_depth.get();
self.guard_depth.set(depth + 1);
depth
}
fn release_guard(&self) {
self.guard_depth.set(self.guard_depth.get() - 1);
}
}
impl Clone for ThreadPool {
fn clone(&self) -> Self {
Self {
pool: LocalPool {
bins: self.pool.bins.clone(),
thread_id: self.pool.thread_id,
all_threads: self.pool.all_threads.clone(),
},
guard_depth: self.guard_depth.clone(),
}
}
}
pub struct LocalPool {
bins: Arc<UnsafeBins>,
thread_id: CollectorThreadId,
all_threads: AllThreadBins,
}
impl Default for LocalPool {
fn default() -> Self {
let all_threads = GlobalCollector::get().info.all_threads.clone();
loop {
let thread_id = CollectorThreadId::unique();
let mut threads = all_threads.write();
if let kempt::map::Entry::Vacant(entry) = threads.entry(thread_id) {
let bins = Arc::<UnsafeBins>::default();
CollectorCommand::NewThread(thread_id, bins.clone()).send();
entry.insert(Arc::downgrade(&bins));
drop(threads);
return LocalPool {
bins,
thread_id,
all_threads,
};
}
}
}
}
impl LocalPool {
#[must_use]
pub fn enter(&self) -> CollectionGuard<'_> {
let depth = ThreadPool::push_thread_guard();
let collector = if depth == 0 {
CollectorReadGuard::acquire()
} else {
CollectorReadGuard::acquire_recursive()
};
CollectionGuard {
collector,
thread: Guarded::Local(self),
}
}
}
impl Drop for LocalPool {
fn drop(&mut self) {
if Arc::strong_count(&self.bins) == 2 {
CollectorCommand::ThreadShutdown(self.thread_id).send();
}
}
}
struct ReaderState(AtomicUsize);
impl ReaderState {
const COLLECTING_BIT: usize = 1 << (usize::BITS - 1);
fn read(&self) -> bool {
self.0
.fetch_update(Ordering::Release, Ordering::Acquire, |state| {
(state & Self::COLLECTING_BIT == 0).then_some(state + 1)
})
.is_ok()
}
fn read_recursive(&self) {
self.0.fetch_add(1, Ordering::Acquire);
}
fn release_read(&self) -> bool {
self.0.fetch_sub(1, Ordering::Acquire) == (Self::COLLECTING_BIT | 1)
}
fn write(&self) -> bool {
let mut current_readers = 0;
self.0
.fetch_update(Ordering::Release, Ordering::Acquire, |state| {
current_readers = state;
Some(state | Self::COLLECTING_BIT)
})
.expect("error updating reader_state");
current_readers == 0
}
fn release_write(&self) {
self.0
.fetch_update(Ordering::Release, Ordering::Acquire, |state| {
Some(state & !Self::COLLECTING_BIT)
})
.expect("error updating reader_state");
}
fn readers(&self) -> usize {
self.0.load(Ordering::Acquire) & !Self::COLLECTING_BIT
}
fn release_read_if_collecting(&self) -> ReleaseReadResult {
match self
.0
.fetch_update(Ordering::Release, Ordering::Acquire, |state| {
(state & Self::COLLECTING_BIT != 0).then_some(state - 1)
}) {
Ok(previous) if previous == (Self::COLLECTING_BIT | 1) => {
ReleaseReadResult::CollectAndUnpark
}
Ok(_) => ReleaseReadResult::Collect,
Err(_) => ReleaseReadResult::Noop,
}
}
}
enum ReleaseReadResult {
CollectAndUnpark,
Collect,
Noop,
}
struct CollectorReadGuard {
global: &'static CollectorInfo,
}
impl CollectorReadGuard {
fn acquire() -> Self {
let mut this = Self {
global: &GlobalCollector::get().info,
};
this.acquire_reader();
this
}
fn try_acquire() -> Option<Self> {
let global = &GlobalCollector::get().info;
global.reader_state.read().then_some(Self { global })
}
fn acquire_recursive() -> Self {
let this = Self {
global: &GlobalCollector::get().info,
};
this.global.reader_state.read_recursive();
this
}
fn acquire_reader(&mut self) {
if !self.global.reader_state.read() {
let mut info_data = self.global.info.lock();
while !self.global.reader_state.read() {
self.global.collection_sync.wait(&mut info_data);
}
}
}
fn read_recursive(&self) -> Self {
self.global.reader_state.read_recursive();
Self {
global: self.global,
}
}
fn release_reader(&mut self) {
if self.global.reader_state.release_read() {
self.global.collector_unparker.unpark();
}
}
fn release_reader_if_collecting(&mut self) -> bool {
match self.global.reader_state.release_read_if_collecting() {
ReleaseReadResult::CollectAndUnpark => {
self.global.collector_unparker.unpark();
true
}
ReleaseReadResult::Collect => true,
ReleaseReadResult::Noop => false,
}
}
}
impl Drop for CollectorReadGuard {
fn drop(&mut self) {
self.release_reader();
}
}
enum Guarded<'a> {
Local(&'a LocalPool),
Thread(ThreadPool),
}
impl Deref for Guarded<'_> {
type Target = LocalPool;
fn deref(&self) -> &Self::Target {
match self {
Self::Local(value) => value,
Self::Thread(value) => &value.pool,
}
}
}
pub struct CollectionGuard<'a> {
collector: CollectorReadGuard,
thread: Guarded<'a>,
}
impl CollectionGuard<'static> {
#[must_use]
pub fn acquire() -> Self {
let thread = ThreadPool::get();
let depth = thread.push_guard();
let collector = if depth == 0 {
CollectorReadGuard::acquire()
} else {
CollectorReadGuard::acquire_recursive()
};
Self {
collector,
thread: Guarded::Thread(thread),
}
}
#[must_use]
pub fn try_acquire() -> Option<Self> {
let thread = ThreadPool::get();
let depth = thread.push_guard();
let collector = if depth == 0 {
CollectorReadGuard::try_acquire()?
} else {
CollectorReadGuard::acquire_recursive()
};
Some(Self {
collector,
thread: Guarded::Thread(thread),
})
}
}
impl CollectionGuard<'_> {
#[allow(clippy::unused_self)]
fn bins_for<'a>(&'a self, bins: &'a UnsafeBins) -> &'a Bins {
unsafe { bins.assume_readable() }
}
fn bins(&self) -> &Bins {
self.bins_for(&self.thread.bins)
}
#[must_use]
pub fn allocating_in<'a>(&self, pool: &'a LocalPool) -> CollectionGuard<'a> {
ThreadPool::push_thread_guard();
CollectionGuard {
collector: self.collector.read_recursive(),
thread: Guarded::Local(pool),
}
}
pub fn collect(&mut self) {
self.try_collect().unwrap();
}
pub fn try_collect(&mut self) -> Result<(), WouldDeadlock> {
self.try_while_unlocked(collect_unchecked)
}
#[allow(clippy::redundant_closure_for_method_calls)] pub fn yield_to_collector(&mut self) {
self.coordinated_yield(|yielder| yielder.wait());
}
pub fn coordinated_yield(
&mut self,
yielder: impl FnOnce(Yielder<'_>) -> YieldComplete,
) -> bool {
if ThreadPool::current_depth() == 1 && self.collector.release_reader_if_collecting() {
let _complete: YieldComplete = yielder(Yielder(&mut self.collector));
true
} else {
false
}
}
pub fn while_unlocked<R>(&mut self, unlocked: impl FnOnce() -> R) -> R {
self.try_while_unlocked(unlocked).unwrap()
}
pub fn try_while_unlocked<R>(
&mut self,
unlocked: impl FnOnce() -> R,
) -> Result<R, WouldDeadlock> {
if ThreadPool::current_depth() == 1 {
self.collector.release_reader();
let result = unlocked();
self.collector.acquire_reader();
Ok(result)
} else {
Err(WouldDeadlock)
}
}
fn adopt<T: Collectable>(&self, value: Rooted<T>) -> (TypeIndex, u32, BinId) {
Bins::adopt(value, self)
}
}
impl Drop for CollectionGuard<'_> {
fn drop(&mut self) {
ThreadPool::release_thread_guard();
}
}
impl<'a> AsRef<CollectionGuard<'a>> for CollectionGuard<'a> {
fn as_ref(&self) -> &CollectionGuard<'a> {
self
}
}
impl<'a> AsMut<CollectionGuard<'a>> for CollectionGuard<'a> {
fn as_mut(&mut self) -> &mut CollectionGuard<'a> {
self
}
}
pub struct Yielder<'a>(&'a mut CollectorReadGuard);
impl Yielder<'_> {
#[must_use]
pub fn wait(self) -> YieldComplete {
self.0.acquire_reader();
YieldComplete { _priv: PhantomData }
}
}
pub struct YieldComplete {
_priv: PhantomData<()>,
}
pub trait Collectable: Trace + MapAs + Send + Sync + 'static {}
impl<T> Collectable for T where T: Trace + MapAs + Send + Sync + 'static {}
pub trait Trace: Send + Sync + 'static {
const MAY_CONTAIN_REFERENCES: bool;
fn trace(&self, tracer: &mut Tracer);
}
pub trait MapAs: Send + Sync + 'static {
type Target: ?Sized + 'static;
fn map_as(&self) -> &Self::Target;
}
pub trait ContainsNoRefs: Send + Sync + 'static {}
impl<T> Trace for T
where
T: ContainsNoRefs,
{
const MAY_CONTAIN_REFERENCES: bool = false;
fn trace(&self, _tracer: &mut Tracer) {}
}
pub trait NoMapping: Send + Sync + 'static {}
impl<T> MapAs for T
where
T: NoMapping,
{
type Target = ();
fn map_as(&self) -> &Self::Target {
&()
}
}
pub trait SimpleType: Send + Sync + 'static {}
impl<T> NoMapping for T where T: SimpleType {}
impl<T> ContainsNoRefs for T where T: SimpleType {}
macro_rules! impl_simple_type {
($($ty:ty),+ ,) => {
$(impl SimpleType for $ty {})+
}
}
impl_simple_type!(
u8,
u16,
u32,
u64,
u128,
usize,
i8,
i16,
i32,
i64,
i128,
isize,
AtomicU8,
AtomicU16,
AtomicU32,
AtomicU64,
AtomicUsize,
AtomicI8,
AtomicI16,
AtomicI32,
AtomicI64,
AtomicIsize,
NonZeroU8,
NonZeroU16,
NonZeroU32,
NonZeroU64,
NonZeroU128,
NonZeroUsize,
NonZeroI8,
NonZeroI16,
NonZeroI32,
NonZeroI64,
NonZeroI128,
NonZeroIsize,
String,
);
impl<T> Trace for Vec<T>
where
T: Trace,
{
const MAY_CONTAIN_REFERENCES: bool = T::MAY_CONTAIN_REFERENCES;
fn trace(&self, tracer: &mut Tracer) {
for item in self {
item.trace(tracer);
}
}
}
impl<T> NoMapping for Vec<T> where T: Send + Sync + 'static {}
impl<T> Trace for VecDeque<T>
where
T: Trace,
{
const MAY_CONTAIN_REFERENCES: bool = T::MAY_CONTAIN_REFERENCES;
fn trace(&self, tracer: &mut Tracer) {
for item in self {
item.trace(tracer);
}
}
}
impl<T> NoMapping for VecDeque<T> where T: Send + Sync + 'static {}
impl<T> Trace for BinaryHeap<T>
where
T: Trace,
{
const MAY_CONTAIN_REFERENCES: bool = T::MAY_CONTAIN_REFERENCES;
fn trace(&self, tracer: &mut Tracer) {
for item in self {
item.trace(tracer);
}
}
}
impl<T> NoMapping for BinaryHeap<T> where T: Send + Sync + 'static {}
impl<T> Trace for LinkedList<T>
where
T: Trace,
{
const MAY_CONTAIN_REFERENCES: bool = T::MAY_CONTAIN_REFERENCES;
fn trace(&self, tracer: &mut Tracer) {
for item in self {
item.trace(tracer);
}
}
}
impl<T> NoMapping for LinkedList<T> where T: Send + Sync + 'static {}
impl<K, V, S> Trace for HashMap<K, V, S>
where
K: Trace,
V: Trace,
S: Send + Sync + 'static,
{
const MAY_CONTAIN_REFERENCES: bool = K::MAY_CONTAIN_REFERENCES || V::MAY_CONTAIN_REFERENCES;
fn trace(&self, tracer: &mut Tracer) {
for (k, v) in self {
k.trace(tracer);
v.trace(tracer);
}
}
}
impl<K, V, S> NoMapping for HashMap<K, V, S>
where
K: Send + Sync + 'static,
V: Send + Sync + 'static,
S: Send + Sync + 'static,
{
}
impl<K, S> Trace for HashSet<K, S>
where
K: Trace,
S: Send + Sync + 'static,
{
const MAY_CONTAIN_REFERENCES: bool = K::MAY_CONTAIN_REFERENCES;
fn trace(&self, tracer: &mut Tracer) {
for k in self {
k.trace(tracer);
}
}
}
impl<K, S> NoMapping for HashSet<K, S>
where
K: Send + Sync + 'static,
S: Send + Sync + 'static,
{
}
impl<K, V> Trace for BTreeMap<K, V>
where
K: Trace,
V: Trace,
{
const MAY_CONTAIN_REFERENCES: bool = K::MAY_CONTAIN_REFERENCES || V::MAY_CONTAIN_REFERENCES;
fn trace(&self, tracer: &mut Tracer) {
for (k, v) in self {
k.trace(tracer);
v.trace(tracer);
}
}
}
impl<K, V> NoMapping for BTreeMap<K, V>
where
K: Send + Sync + 'static,
V: Send + Sync + 'static,
{
}
impl<K> Trace for BTreeSet<K>
where
K: Trace,
{
const MAY_CONTAIN_REFERENCES: bool = K::MAY_CONTAIN_REFERENCES;
fn trace(&self, tracer: &mut Tracer) {
for k in self {
k.trace(tracer);
}
}
}
impl<K> NoMapping for BTreeSet<K> where K: Send + Sync + 'static {}
impl<K, V> Trace for Map<K, V>
where
K: Trace + kempt::Sort,
V: Trace,
{
const MAY_CONTAIN_REFERENCES: bool = K::MAY_CONTAIN_REFERENCES || V::MAY_CONTAIN_REFERENCES;
fn trace(&self, tracer: &mut Tracer) {
for field in self {
field.key().trace(tracer);
field.value.trace(tracer);
}
}
}
impl<K, V> NoMapping for Map<K, V>
where
K: kempt::Sort + Send + Sync + 'static,
V: Send + Sync + 'static,
{
}
impl<K> Trace for Set<K>
where
K: Trace + kempt::Sort,
{
const MAY_CONTAIN_REFERENCES: bool = K::MAY_CONTAIN_REFERENCES;
fn trace(&self, tracer: &mut Tracer) {
for k in self {
k.trace(tracer);
}
}
}
impl<K> NoMapping for Set<K> where K: kempt::Sort + Send + Sync + 'static {}
impl<T, const N: usize> Trace for [T; N]
where
T: Trace,
{
const MAY_CONTAIN_REFERENCES: bool = T::MAY_CONTAIN_REFERENCES;
fn trace(&self, tracer: &mut Tracer) {
for item in self {
item.trace(tracer);
}
}
}
impl<T, const N: usize> NoMapping for [T; N] where T: Send + Sync + 'static {}
impl<T> Trace for Root<T>
where
T: Collectable,
{
const MAY_CONTAIN_REFERENCES: bool = false;
fn trace(&self, _tracer: &mut Tracer) {
}
}
impl<T> Trace for Ref<T>
where
T: Collectable,
{
const MAY_CONTAIN_REFERENCES: bool = true;
fn trace(&self, tracer: &mut Tracer) {
tracer.mark(*self);
}
}
impl<T> AsRef<Ref<T>> for Ref<T> {
fn as_ref(&self) -> &Ref<T> {
self
}
}
pub struct Tracer<'a> {
tracing_thread: CollectorThreadId,
mark_one_sender: &'a Arc<Sender<MarkRequest>>,
}
impl<'a> Tracer<'a> {
fn new(thread: CollectorThreadId, mark_one_sender: &'a Arc<Sender<MarkRequest>>) -> Self {
Self {
tracing_thread: thread,
mark_one_sender,
}
}
pub fn mark(&mut self, collectable: impl Into<AnyRef>) {
let collectable = collectable.into();
self.mark_one_sender
.send(MarkRequest {
thread: collectable.creating_thread,
type_index: collectable.type_index,
slot_generation: collectable.slot_generation,
bin_id: collectable.bin_id,
mark_one_sender: self.mark_one_sender.clone(),
})
.assert("marker thread not running");
}
}
#[std::prelude::v1::test]
fn size_of_types() {
assert_eq!(std::mem::size_of::<Root<u32>>(), 24);
assert_eq!(std::mem::size_of::<Ref<u32>>(), 16);
assert_eq!(std::mem::size_of::<AnyRef>(), 16);
}
pub struct Root<T>
where
T: Collectable,
{
data: *const Rooted<T>,
reference: Ref<T>,
}
impl<T> Root<T>
where
T: Collectable,
{
fn from_parts(
type_index: TypeIndex,
slot_generation: u32,
bin_id: BinId,
guard: &CollectionGuard<'_>,
) -> Self {
let data = unsafe { guard.bins().allocated_slot_pointer::<T>(type_index, bin_id) };
Self {
data,
reference: Ref {
any: AnyRef {
type_index,
creating_thread: guard.thread.thread_id,
slot_generation,
bin_id,
},
_t: PhantomData,
},
}
}
pub fn new<'a>(value: T, guard: &impl AsRef<CollectionGuard<'a>>) -> Self {
let guard = guard.as_ref();
let (type_index, gen, bin) = guard.adopt(Rooted::root(value));
Self::from_parts(type_index, gen, bin, guard)
}
pub fn try_from_any<'a>(
root: AnyRoot,
guard: &impl AsRef<CollectionGuard<'a>>,
) -> Result<Self, AnyRoot> {
if TypeIndex::of::<T>() == root.any.type_index {
let slot = root.any.load_slot(guard.as_ref()).assert("root missing");
Ok(Self {
data: slot,
reference: root.any.downcast_ref(),
})
} else {
Err(root)
}
}
#[must_use]
pub fn root_count(&self) -> u64 {
self.as_rooted().roots.load(Ordering::Acquire)
}
#[must_use]
pub const fn downgrade(&self) -> Ref<T> {
self.reference
}
#[must_use]
pub const fn downgrade_any(&self) -> AnyRef {
self.reference.as_any()
}
#[must_use]
pub fn to_any_root(&self) -> AnyRoot {
let roots = &self.as_rooted().roots;
roots.fetch_add(1, Ordering::Acquire);
AnyRoot {
rooted: self.data.cast(),
roots,
any: self.reference.as_any(),
}
}
#[must_use]
pub fn into_any_root(self) -> AnyRoot {
let this = ManuallyDrop::new(self);
AnyRoot {
rooted: this.data.cast(),
roots: &this.as_rooted().roots,
any: this.reference.as_any(),
}
}
fn as_rooted(&self) -> &Rooted<T> {
unsafe { &(*self.data) }
}
#[must_use]
pub fn ptr_eq(this: &Self, other: &Self) -> bool {
this.reference == other.reference
}
}
impl<T> Debug for Root<T>
where
T: Collectable + Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&**self, f)
}
}
impl<T> Clone for Root<T>
where
T: Collectable,
{
fn clone(&self) -> Self {
self.as_rooted().roots.fetch_add(1, Ordering::Acquire);
Self {
data: self.data,
reference: self.reference,
}
}
}
impl<T> AsRef<Ref<T>> for Root<T>
where
T: Collectable,
{
fn as_ref(&self) -> &Ref<T> {
&self.reference
}
}
unsafe impl<T> Send for Root<T> where T: Collectable {}
unsafe impl<T> Sync for Root<T> where T: Collectable {}
impl<T> Deref for Root<T>
where
T: Collectable,
{
type Target = T;
fn deref(&self) -> &Self::Target {
&self.as_rooted().value
}
}
impl<T> Drop for Root<T>
where
T: Collectable,
{
fn drop(&mut self) {
if self.as_rooted().roots.fetch_sub(1, Ordering::Acquire) == 1 {
CollectorCommand::schedule_collect_if_needed();
}
}
}
impl<T> Eq for Root<T> where T: Collectable + Eq {}
impl<T> PartialEq for Root<T>
where
T: Collectable + PartialEq,
{
fn eq(&self, other: &Self) -> bool {
self.reference == other.reference || **self == **other
}
}
impl<T> PartialEq<Ref<T>> for Root<T>
where
T: Collectable,
{
fn eq(&self, other: &Ref<T>) -> bool {
self.reference == *other
}
}
impl<T> PartialEq<&'_ Ref<T>> for Root<T>
where
T: Collectable,
{
fn eq(&self, other: &&'_ Ref<T>) -> bool {
self == *other
}
}
impl<T> PartialEq<AnyRef> for Root<T>
where
T: Collectable,
{
fn eq(&self, other: &AnyRef) -> bool {
self.reference == *other
}
}
impl<T> PartialEq<&'_ AnyRef> for Root<T>
where
T: Collectable,
{
fn eq(&self, other: &&'_ AnyRef) -> bool {
self == *other
}
}
impl<T> Hash for Root<T>
where
T: Collectable + Hash,
{
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
(**self).hash(state);
}
}
impl<T> Ord for Root<T>
where
T: Collectable + Ord,
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(**self).cmp(&**other)
}
}
impl<T> PartialOrd for Root<T>
where
T: Collectable + PartialOrd,
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
(**self).partial_cmp(&**other)
}
}
pub struct Ref<T> {
any: AnyRef,
_t: PhantomData<fn(&T)>,
}
impl<T> Ref<T>
where
T: Collectable,
{
pub fn new<'a>(value: T, guard: &impl AsRef<CollectionGuard<'a>>) -> Self {
let guard = guard.as_ref();
let (type_index, slot_generation, bin_id) = guard.adopt(Rooted::reference(value));
Self {
any: AnyRef {
type_index,
creating_thread: guard.thread.thread_id,
slot_generation,
bin_id,
},
_t: PhantomData,
}
}
#[must_use]
pub const fn as_any(self) -> AnyRef {
self.any
}
fn load_slot<'guard>(&self, guard: &'guard CollectionGuard<'_>) -> Option<&'guard Rooted<T>> {
self.any.load_slot(guard)
}
#[must_use]
pub fn load<'guard>(&self, guard: &'guard CollectionGuard<'_>) -> Option<&'guard T> {
self.load_slot(guard).map(|allocated| &allocated.value)
}
#[must_use]
pub fn as_root(&self, guard: &CollectionGuard<'_>) -> Option<Root<T>> {
self.load_slot(guard).map(|allocated| {
allocated.roots.fetch_add(1, Ordering::Acquire);
Root {
data: allocated,
reference: *self,
}
})
}
}
impl<T> Eq for Ref<T> {}
impl<T> PartialEq for Ref<T> {
fn eq(&self, other: &Self) -> bool {
self.any == other.any
}
}
impl<T> Ord for Ref<T> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.any.cmp(&other.any)
}
}
impl<T> PartialOrd for Ref<T> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.any.cmp(&other.any))
}
}
impl<T> Clone for Ref<T> {
fn clone(&self) -> Self {
*self
}
}
impl<T> Copy for Ref<T> {}
impl<T> Hash for Ref<T> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.any.hash(state);
}
}
impl<T> PartialEq<Root<T>> for Ref<T>
where
T: Collectable,
{
fn eq(&self, other: &Root<T>) -> bool {
*self == other.reference
}
}
impl<T> PartialEq<&'_ Root<T>> for Ref<T>
where
T: Collectable,
{
fn eq(&self, other: &&'_ Root<T>) -> bool {
self == *other
}
}
impl<T> PartialEq<AnyRef> for Ref<T>
where
T: Collectable,
{
fn eq(&self, other: &AnyRef) -> bool {
self.any == *other
}
}
impl<T> PartialEq<&'_ AnyRef> for Ref<T>
where
T: Collectable,
{
fn eq(&self, other: &&'_ AnyRef) -> bool {
self == *other
}
}
impl<T> Debug for Ref<T>
where
T: Collectable + Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let guard = CollectionGuard::acquire();
if let Some(contents) = self.load(&guard) {
Debug::fmt(contents, f)
} else {
f.debug_tuple("Ref").field(&"<freed>").finish()
}
}
}
unsafe impl<T> Send for Ref<T> where T: Send {}
unsafe impl<T> Sync for Ref<T> where T: Sync {}
#[derive(Default)]
struct Bins {
by_type: RwLock<Map<TypeIndex, Arc<dyn AnyBin>>>,
}
impl Bins {
unsafe fn allocated_slot_pointer<T>(
&self,
type_index: TypeIndex,
bin_id: BinId,
) -> *const Rooted<T>
where
T: Collectable,
{
let by_type = self.by_type.read();
let slot = &by_type
.get(&type_index)
.expect("areas are never deallocated")
.as_any()
.downcast_ref::<Bin<T>>()
.expect("type mismatch")
.slabs[bin_id.slab() as usize]
.slots[usize::from(bin_id.slot())];
&*(*slot.value.get()).allocated
}
fn adopt<T>(value: Rooted<T>, bins_guard: &CollectionGuard<'_>) -> (TypeIndex, u32, BinId)
where
T: Collectable,
{
let type_id = TypeIndex::of::<T>();
let mut by_type = bins_guard.bins().by_type.upgradable_read();
if let Some(bin) = by_type.get(&type_id) {
let (gen, bin) = bin
.as_any()
.downcast_ref::<Bin<T>>()
.expect("type mismatch")
.adopt(value);
(type_id, gen, bin)
} else {
by_type.with_upgraded(|by_type| {
let bin = Bin::new(value, type_id, bins_guard.thread.thread_id);
by_type.insert(type_id, Arc::new(bin));
(type_id, 0, BinId::first())
})
}
}
}
struct Bin<T>
where
T: Collectable,
{
thread_id: CollectorThreadId,
type_index: TypeIndex,
free_head: AtomicU32,
slabs: Slabs<T>,
slabs_tail: Cell<Option<*const Slabs<T>>>,
mapper: Mapper<T::Target>,
}
impl<T> Bin<T>
where
T: Collectable,
{
fn new(first_value: Rooted<T>, type_index: TypeIndex, thread_id: CollectorThreadId) -> Self {
Self {
thread_id,
type_index,
free_head: AtomicU32::new(0),
slabs: Slabs::new(first_value, 0),
slabs_tail: Cell::new(None),
mapper: Mapper(Box::new(MappingFunction::<T>::new())),
}
}
fn adopt(&self, value: Rooted<T>) -> (u32, BinId) {
let mut value = Some(value);
loop {
let bin_id = BinId(self.free_head.load(Ordering::Acquire));
if bin_id.invalid() {
break;
}
let slab = &self.slabs[bin_id.slab() as usize];
let slot_index = bin_id.slot();
let slot = &slab.slots[usize::from(slot_index)];
if let Some((generation, next)) = slot.state.try_allocate(|| unsafe {
let next = (*slot.value.get()).free;
slot.value.get().write(SlotData {
allocated: ManuallyDrop::new(value.take().expect("only taken once")),
});
next
}) {
self.free_head.store(next, Ordering::Release);
let _result = slab.last_allocated.fetch_update(
Ordering::Release,
Ordering::Acquire,
|last_allocated| (last_allocated < slot_index).then_some(slot_index),
);
return (generation, bin_id);
}
}
let tail = if let Some(tail) = self.slabs_tail.get() {
unsafe { &*tail }
} else {
&self.slabs
};
let (generation, bin_id, new_tail) = tail.adopt(value.take().expect("only taken once"));
if new_tail.is_some() {
self.slabs_tail.set(new_tail);
}
(generation, bin_id)
}
fn load<'guard>(
&self,
bin_id: BinId,
slot_generation: u32,
_guard: &'guard CollectionGuard<'_>,
) -> Option<&'guard Rooted<T>> {
let slab = self.slabs.get(bin_id.slab() as usize)?;
let slot = &slab.slots[usize::from(bin_id.slot())];
slot.state
.allocated_with_generation(slot_generation)
.then_some(
unsafe { &(*slot.value.get()).allocated },
)
}
}
trait AnyBin: Send + Sync {
fn trace(&self, tracer: &mut Tracer<'_>);
fn trace_one(&self, slot_generation: u32, bin: BinId, tracer: &mut Tracer<'_>);
fn mark_one(&self, mark_bits: u8, slot_generation: u32, bin: BinId) -> bool;
fn sweep(&self, mark_bits: u8) -> usize;
fn as_any(&self) -> &dyn Any;
fn mapper(&self) -> &dyn Any;
fn load_root(
&self,
bin_id: BinId,
slot_generation: u32,
guard: &CollectionGuard<'_>,
) -> Option<AnyRoot>;
}
impl<T> AnyBin for Bin<T>
where
T: Collectable,
{
fn trace(&self, tracer: &mut Tracer) {
for (slab_index, slab) in self.slabs.iter().enumerate() {
for (index, slot) in slab.slots.iter().enumerate() {
let Some(slot_generation) = slot.state.generation() else {
continue;
};
let root_count =
unsafe { (*slot.value.get()).allocated.roots.load(Ordering::Relaxed) };
if root_count > 0 {
tracer.mark(AnyRef {
type_index: self.type_index,
creating_thread: tracer.tracing_thread,
slot_generation,
bin_id: BinId::new(slab_index.cast::<u32>(), index.cast::<u8>()),
});
}
}
}
}
fn trace_one(&self, slot_generation: u32, bin: BinId, tracer: &mut Tracer) {
let slot = &self.slabs[bin.slab() as usize].slots[usize::from(bin.slot())];
if slot.state.generation() == Some(slot_generation) {
unsafe {
(*slot.value.get()).allocated.value.trace(tracer);
}
}
}
fn mark_one(&self, mark_bits: u8, slot_generation: u32, bin: BinId) -> bool {
let slot = &self.slabs[bin.slab() as usize].slots[usize::from(bin.slot())];
slot.state.mark(mark_bits, slot_generation) && T::MAY_CONTAIN_REFERENCES
}
fn sweep(&self, mark_bits: u8) -> usize {
let mut free_head = BinId(self.free_head.load(Ordering::Acquire));
let mut allocated = 0;
for (slab_index, slab) in self.slabs.iter().enumerate() {
let current_last_allocated = slab.last_allocated.load(Ordering::Acquire);
let mut last_allocated = 0;
for (slot_index, slot) in slab.slots[0..=usize::from(current_last_allocated)]
.iter()
.enumerate()
{
match slot.sweep(mark_bits, free_head) {
SlotSweepStatus::Swept => {
free_head = BinId::new(slab_index.cast::<u32>(), slot_index.cast::<u8>());
}
SlotSweepStatus::Allocated => {
allocated += 1;
last_allocated = slot_index.cast::<u8>();
}
SlotSweepStatus::NotAllocated => {}
}
}
if last_allocated < current_last_allocated {
slab.last_allocated.store(last_allocated, Ordering::Release);
}
}
self.free_head.store(free_head.0, Ordering::Release);
allocated
}
fn as_any(&self) -> &dyn Any {
self
}
fn mapper(&self) -> &dyn Any {
&self.mapper
}
fn load_root(
&self,
bin_id: BinId,
slot_generation: u32,
_guard: &CollectionGuard<'_>,
) -> Option<AnyRoot> {
let slab = self.slabs.get(bin_id.slab() as usize)?;
let slot = &slab.slots[usize::from(bin_id.slot())];
if slot.state.allocated_with_generation(slot_generation) {
let slot = unsafe { &(*slot.value.get()).allocated };
slot.roots.fetch_add(1, Ordering::Relaxed);
Some(AnyRoot {
rooted: std::ptr::addr_of!(**slot).cast::<()>(),
roots: &slot.roots,
any: AnyRef {
bin_id,
creating_thread: self.thread_id,
type_index: self.type_index,
slot_generation,
},
})
} else {
None
}
}
}
struct Mapper<T>(Box<dyn MapFn<T>>)
where
T: ?Sized + 'static;
trait MapFn<T>
where
T: ?Sized + 'static,
{
fn load_mapped<'guard>(
&self,
id: BinId,
slot_generation: u32,
bin: &dyn AnyBin,
guard: &'guard CollectionGuard<'_>,
) -> Option<&'guard T>;
}
struct MappingFunction<C: Collectable>(PhantomData<C>);
impl<C> MappingFunction<C>
where
C: Collectable,
{
const fn new() -> Self {
Self(PhantomData)
}
}
impl<C> MapFn<C::Target> for MappingFunction<C>
where
C: Collectable,
{
fn load_mapped<'guard>(
&self,
id: BinId,
slot_generation: u32,
bin: &dyn AnyBin,
guard: &'guard CollectionGuard<'_>,
) -> Option<&'guard C::Target> {
let ref_counted =
bin.as_any()
.downcast_ref::<Bin<C>>()?
.load(id, slot_generation, guard)?;
Some(ref_counted.value.map_as())
}
}
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)]
struct BinId(u32);
impl BinId {
const fn new(slab: u32, slot: u8) -> Self {
assert!(slab < 0xFF_FFFF);
Self((slab + 1) << 8 | slot as u32)
}
const fn first() -> Self {
Self::new(0, 0)
}
const fn invalid(self) -> bool {
self.0 == 0
}
const fn slab(self) -> u32 {
(self.0 >> 8) - 1
}
#[allow(clippy::cast_possible_truncation)] const fn slot(self) -> u8 {
self.0 as u8
}
}
struct Slabs<T> {
offset: usize,
first_free_slab: Cell<usize>,
slab_slots: [UnsafeCell<Option<Box<Slab<T>>>>; 256],
next: UnsafeCell<Option<Box<Slabs<T>>>>,
}
impl<T> Slabs<T>
where
T: Collectable,
{
fn new(initial_value: Rooted<T>, offset: usize) -> Self {
let mut initial_value = Some(initial_value);
Self {
offset,
first_free_slab: Cell::new(0),
slab_slots: array::from_fn(|index| {
UnsafeCell::new(if index == 0 {
Some(Slab::new(initial_value.take().expect("only called once")))
} else {
None
})
}),
next: UnsafeCell::new(None),
}
}
fn get(&self, index: usize) -> Option<&Slab<T>> {
if index < 256 {
unsafe { (*self.slab_slots[index].get()).as_ref() }.map(|slab| &**slab)
} else {
unsafe { (*self.next.get()).as_ref() }.and_then(|slabs| slabs.get(index - 256))
}
}
fn adopt(&self, mut value: Rooted<T>) -> (u32, BinId, Option<*const Slabs<T>>) {
let first_free = self.first_free_slab.get();
for index in first_free..256 {
let slab = unsafe { &mut (*self.slab_slots[index].get()) };
if let Some(slab) = slab {
match slab.try_adopt(value, index + self.offset) {
Ok((gen, bin)) => {
if first_free < index {
self.first_free_slab.set(index);
}
return (gen, bin, None);
}
Err(returned) => value = returned,
}
} else {
*slab = Some(Slab::new(value));
return (0, BinId::new(index.cast::<u32>(), 0), None);
}
}
if let Some(next) = unsafe { &*self.next.get() } {
next.adopt(value)
} else {
let slabs = Box::new(Slabs::new(value, self.offset + 256));
let new_tail: *const Slabs<T> = &*slabs;
unsafe { self.next.get().write(Some(slabs)) };
(
0,
BinId::new(self.offset.cast::<u32>() + 256, 0),
Some(new_tail),
)
}
}
fn iter(&self) -> SlabsIter<'_, T> {
SlabsIter {
slabs: self.slab_slots.iter(),
this: self,
}
}
}
impl<T> std::ops::Index<usize> for Slabs<T>
where
T: Collectable,
{
type Output = Slab<T>;
fn index(&self, index: usize) -> &Self::Output {
self.get(index).expect("out of bounds")
}
}
struct SlabsIter<'a, T> {
slabs: slice::Iter<'a, UnsafeCell<Option<Box<Slab<T>>>>>,
this: &'a Slabs<T>,
}
impl<'a, T> Iterator for SlabsIter<'a, T> {
type Item = &'a Slab<T>;
fn next(&mut self) -> Option<Self::Item> {
loop {
for slab in &mut self.slabs {
let Some(slab) = (unsafe { (*slab.get()).as_ref() }) else {
continue;
};
return Some(slab);
}
self.this = unsafe { (*self.this.next.get()).as_ref() }?;
}
}
}
struct Slab<T> {
slots: [Slot<T>; 256],
last_allocated: AtomicU8,
}
impl<T> Slab<T> {
fn new(first_value: Rooted<T>) -> Box<Self>
where
T: Collectable,
{
let mut this: Box<Self> =
unsafe { Box::from_raw(alloc_zeroed(Layout::new::<Self>()).cast()) };
this.slots[0] = Slot {
state: SlotState::new_allocated(),
value: UnsafeCell::new(SlotData {
allocated: ManuallyDrop::new(first_value),
}),
};
this
}
fn try_adopt(&self, value: Rooted<T>, slab_index: usize) -> Result<(u32, BinId), Rooted<T>> {
if let Ok(last_allocated) = self.last_allocated.fetch_update(
Ordering::Release,
Ordering::Acquire,
|last_allocated| last_allocated.checked_add(1),
) {
let slot_index = last_allocated + 1;
if slot_index == u8::MAX {
CollectorCommand::schedule_collect_if_needed();
}
let slot = &self.slots[usize::from(slot_index)];
let generation = slot.allocate(value);
Ok((generation, BinId::new(slab_index.cast::<u32>(), slot_index)))
} else {
Err(value)
}
}
}
union SlotData<T> {
allocated: ManuallyDrop<Rooted<T>>,
free: u32,
}
struct Slot<T> {
state: SlotState,
value: UnsafeCell<SlotData<T>>,
}
impl<T> Slot<T> {
fn allocate(&self, value: Rooted<T>) -> u32 {
let generation = self.state.allocate();
unsafe {
self.value.get().write(SlotData {
allocated: ManuallyDrop::new(value),
});
}
generation
}
fn sweep(&self, mark_bits: u8, free_head: BinId) -> SlotSweepStatus {
match self.state.sweep(mark_bits) {
SlotSweepStatus::Swept => {
unsafe {
ManuallyDrop::drop(&mut (*self.value.get()).allocated);
self.value.get().write(SlotData { free: free_head.0 });
}
SlotSweepStatus::Swept
}
other => other,
}
}
}
unsafe impl<T> Send for Bin<T> where T: Collectable {}
unsafe impl<T> Sync for Bin<T> where T: Collectable {}
struct Rooted<T> {
roots: AtomicU64,
value: T,
}
impl<T> Rooted<T> {
fn reference(value: T) -> Self {
Self {
roots: AtomicU64::new(0),
value,
}
}
fn root(value: T) -> Self {
Self {
roots: AtomicU64::new(1),
value,
}
}
}
struct SlotState(AtomicU64);
impl SlotState {
const ALLOCATED: u64 = 1 << 33;
const MARK_OFFSET: u32 = 34;
const fn new_allocated() -> Self {
Self(AtomicU64::new(Self::ALLOCATED))
}
fn generation(&self) -> Option<u32> {
let state = self.0.load(Ordering::Relaxed);
(state & Self::ALLOCATED != 0).then(|| state.cast::<u32>())
}
fn allocated_with_generation(&self, generation: u32) -> bool {
let state = self.0.load(Ordering::Relaxed);
state & Self::ALLOCATED != 0 && state.cast::<u32>() == generation
}
fn try_allocate<R>(&self, allocated: impl FnOnce() -> R) -> Option<(u32, R)> {
let state = self.0.load(Ordering::Acquire);
if state & Self::ALLOCATED != 0 {
return None;
}
let result = allocated();
let generation = state.cast::<u32>().wrapping_add(1);
self.0
.store(Self::ALLOCATED | u64::from(generation), Ordering::Release);
Some((generation, result))
}
fn allocate(&self) -> u32 {
let state = self.0.load(Ordering::Acquire);
debug_assert_eq!(state & Self::ALLOCATED, 0);
let generation = state.cast::<u32>().wrapping_add(1);
self.0
.store(Self::ALLOCATED | u64::from(generation), Ordering::Release);
generation
}
fn mark(&self, mark_bits: u8, slot_generation: u32) -> bool {
let mut state = self.0.load(Ordering::Acquire);
if state & Self::ALLOCATED == 0 || state.cast::<u32>() != slot_generation {
return false;
}
let mark_bits = u64::from(mark_bits);
let current_mark = state >> Self::MARK_OFFSET;
if current_mark == mark_bits {
return false;
}
state &= !(0xFF << Self::MARK_OFFSET);
state |= mark_bits << Self::MARK_OFFSET;
self.0.store(state, Ordering::Release);
true
}
fn sweep(&self, mark_bits: u8) -> SlotSweepStatus {
let mark_bits = u64::from(mark_bits);
let state = self.0.load(Ordering::Acquire);
let current_mark = state >> Self::MARK_OFFSET;
if state & Self::ALLOCATED == 0 {
return SlotSweepStatus::NotAllocated;
} else if current_mark == mark_bits {
return SlotSweepStatus::Allocated;
}
let generation = state.cast::<u32>();
self.0.store(u64::from(generation), Ordering::Release);
SlotSweepStatus::Swept
}
}
#[derive(Clone, Copy)]
enum SlotSweepStatus {
Allocated,
NotAllocated,
Swept,
}
pub fn collect() {
try_collect().unwrap();
}
pub fn try_collect() -> Result<(), WouldDeadlock> {
if ThreadPool::current_depth() > 0 {
return Err(WouldDeadlock);
}
collect_unchecked();
Ok(())
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct WouldDeadlock;
impl Display for WouldDeadlock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("current thread has a non-yielding CollectionGuard")
}
}
fn collect_unchecked() {
let now = Instant::now();
CollectorCommand::Collect(now).send();
GlobalCollector::get().info.wait_for_collection(now);
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct TypeIndex(u32);
impl TypeIndex {
fn of<T: 'static>() -> TypeIndex {
let collector = GlobalCollector::get();
let types = collector.info.type_indexes.read();
let type_id = TypeId::of::<T>();
if let Some(index) = types.get(&type_id) {
*index
} else {
drop(types);
let mut types = collector.info.type_indexes.write();
let next_id = types.len();
*types
.entry(type_id)
.or_insert(TypeIndex(u32::try_from(next_id).expect("too many types")))
}
}
}
#[derive(Eq, PartialEq, PartialOrd, Ord)]
pub struct AnyRoot {
rooted: *const (),
roots: *const AtomicU64,
any: AnyRef,
}
impl AnyRoot {
#[must_use]
pub fn load<T>(&self) -> Option<&T>
where
T: Collectable,
{
if TypeIndex::of::<T>() == self.any.type_index {
let rooted = unsafe { &*self.rooted.cast::<Rooted<T>>() };
Some(&rooted.value)
} else {
None
}
}
#[must_use]
pub fn downcast_root<T>(&self) -> Option<Root<T>>
where
T: Collectable,
{
if TypeIndex::of::<T>() == self.any.type_index {
let rooted = unsafe { &*self.rooted.cast::<Rooted<T>>() };
rooted.roots.fetch_add(1, Ordering::Relaxed);
Some(Root {
data: rooted,
reference: self.downcast_ref(),
})
} else {
None
}
}
#[must_use]
pub const fn downcast_ref<T>(&self) -> Ref<T>
where
T: Collectable,
{
self.any.downcast_ref()
}
#[must_use]
pub fn downcast_checked<T>(&self) -> Option<Ref<T>>
where
T: Collectable,
{
self.any.downcast_checked()
}
#[must_use]
pub const fn as_any(&self) -> AnyRef {
self.any
}
}
impl Clone for AnyRoot {
fn clone(&self) -> Self {
unsafe { &*self.roots }.fetch_add(1, Ordering::Acquire);
Self {
rooted: self.rooted,
roots: self.roots,
any: self.any,
}
}
}
impl Drop for AnyRoot {
fn drop(&mut self) {
if unsafe { &*self.roots }.fetch_sub(1, Ordering::Acquire) == 1 {
CollectorCommand::schedule_collect_if_needed();
}
}
}
impl Hash for AnyRoot {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.any.hash(state);
}
}
impl<T> From<Root<T>> for AnyRoot
where
T: Collectable,
{
fn from(value: Root<T>) -> Self {
value.into_any_root()
}
}
unsafe impl Send for AnyRoot {}
unsafe impl Sync for AnyRoot {}
#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord)]
pub struct AnyRef {
bin_id: BinId,
creating_thread: CollectorThreadId,
type_index: TypeIndex,
slot_generation: u32,
}
impl AnyRef {
fn load_slot_from<'guard, T>(
&self,
bins: &Map<TypeIndex, Arc<dyn AnyBin>>,
guard: &'guard CollectionGuard<'_>,
) -> Option<&'guard Rooted<T>>
where
T: Collectable,
{
bins.get(&self.type_index)?
.as_any()
.downcast_ref::<Bin<T>>()?
.load(self.bin_id, self.slot_generation, guard)
}
fn load_slot<'guard, T>(&self, guard: &'guard CollectionGuard<'_>) -> Option<&'guard Rooted<T>>
where
T: Collectable,
{
if guard.thread.thread_id == self.creating_thread {
self.load_slot_from(&guard.bins().by_type.read(), guard)
} else {
let all_threads = guard.thread.all_threads.read();
let other_thread_bins = all_threads
.get(&self.creating_thread)
.and_then(Weak::upgrade)?;
let bins = guard.bins_for(&other_thread_bins);
let result = self.load_slot_from(&bins.by_type.read(), guard);
drop(other_thread_bins);
result
}
}
#[must_use]
pub const fn downcast_ref<T>(&self) -> Ref<T>
where
T: Collectable,
{
Ref {
any: *self,
_t: PhantomData,
}
}
pub fn upgrade(&self, guard: &CollectionGuard<'_>) -> Option<AnyRoot> {
if guard.thread.thread_id == self.creating_thread {
self.load_root_from(&guard.bins().by_type.read(), guard)
} else {
let all_threads = guard.thread.all_threads.read();
let other_thread_bins = all_threads
.get(&self.creating_thread)
.and_then(Weak::upgrade)?;
let bins = guard.bins_for(&other_thread_bins);
let result = self.load_root_from(&bins.by_type.read(), guard);
drop(other_thread_bins);
result
}
}
fn load_root_from(
&self,
bins: &Map<TypeIndex, Arc<dyn AnyBin>>,
guard: &CollectionGuard<'_>,
) -> Option<AnyRoot> {
bins.get(&self.type_index)?
.load_root(self.bin_id, self.slot_generation, guard)
}
#[must_use]
pub fn downcast_checked<T>(&self) -> Option<Ref<T>>
where
T: Collectable,
{
(TypeIndex::of::<T>() == self.type_index).then_some(self.downcast_ref())
}
#[must_use]
pub fn downcast_root<T>(&self, guard: &CollectionGuard) -> Option<Root<T>>
where
T: Collectable,
{
self.downcast_ref().as_root(guard)
}
#[must_use]
pub fn load<'guard, T>(&self, guard: &'guard CollectionGuard<'_>) -> Option<&'guard T>
where
T: Collectable,
{
self.downcast_ref().load(guard)
}
pub fn load_mapped<'guard, T>(&self, guard: &'guard CollectionGuard<'_>) -> Option<&'guard T>
where
T: ?Sized + 'static,
{
if guard.thread.thread_id == self.creating_thread {
self.load_mapped_slot_from(&guard.bins().by_type.read(), guard)
} else {
let all_threads = guard.thread.all_threads.read();
let other_thread_bins = all_threads
.get(&self.creating_thread)
.and_then(Weak::upgrade)?;
let bins = guard.bins_for(&other_thread_bins);
let result = self.load_mapped_slot_from(&bins.by_type.read(), guard);
drop(other_thread_bins);
result
}
}
fn load_mapped_slot_from<'guard, T>(
&self,
bins: &Map<TypeIndex, Arc<dyn AnyBin>>,
guard: &'guard CollectionGuard<'_>,
) -> Option<&'guard T>
where
T: ?Sized + 'static,
{
let bins = bins.get(&self.type_index)?;
bins.mapper().downcast_ref::<Mapper<T>>()?.0.load_mapped(
self.bin_id,
self.slot_generation,
&**bins,
guard,
)
}
}
impl Trace for AnyRef {
const MAY_CONTAIN_REFERENCES: bool = true;
fn trace(&self, tracer: &mut Tracer) {
tracer.mark(self);
}
}
impl From<&'_ AnyRef> for AnyRef {
fn from(value: &'_ AnyRef) -> Self {
*value
}
}
impl<T> From<&'_ Ref<T>> for AnyRef
where
T: Collectable,
{
fn from(value: &'_ Ref<T>) -> Self {
value.as_any()
}
}
impl<T> From<Ref<T>> for AnyRef
where
T: Collectable,
{
fn from(value: Ref<T>) -> Self {
value.as_any()
}
}
impl<T> From<&'_ Root<T>> for AnyRef
where
T: Collectable,
{
fn from(value: &'_ Root<T>) -> Self {
value.downgrade_any()
}
}
impl Hash for AnyRef {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.type_index.hash(state);
self.creating_thread.hash(state);
self.slot_generation.hash(state);
self.bin_id.hash(state);
}
}
#[cfg(test)]
mod tests;