use super::stat::WorkerLocalStat;
use super::work_bucket::*;
use super::*;
use crate::mmtk::MMTK;
use crate::util::copy::GCWorkerCopyContext;
use crate::util::heap::layout::heap_parameters::MAX_SPACES;
use crate::util::opaque_pointer::*;
use crate::util::ObjectReference;
use crate::vm::{Collection, GCThreadContext, VMBinding};
use atomic::Atomic;
use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut};
use crossbeam::deque::{self, Stealer};
use crossbeam::queue::ArrayQueue;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
pub type ThreadId = usize;
thread_local! {
static WORKER_ORDINAL: Atomic<ThreadId> = const { Atomic::new(ThreadId::MAX) };
}
pub fn current_worker_ordinal() -> ThreadId {
let ordinal = WORKER_ORDINAL.with(|x| x.load(Ordering::Relaxed));
debug_assert_ne!(
ordinal,
ThreadId::MAX,
"Thread-local variable WORKER_ORDINAL not set yet."
);
ordinal
}
pub struct GCWorkerShared<VM: VMBinding> {
stat: AtomicRefCell<WorkerLocalStat<VM>>,
pub live_bytes_per_space: AtomicRefCell<[usize; MAX_SPACES]>,
pub designated_work: ArrayQueue<Box<dyn GCWork<VM>>>,
pub stealer: Option<Stealer<Box<dyn GCWork<VM>>>>,
}
impl<VM: VMBinding> GCWorkerShared<VM> {
pub fn new(stealer: Option<Stealer<Box<dyn GCWork<VM>>>>) -> Self {
Self {
stat: Default::default(),
live_bytes_per_space: AtomicRefCell::new([0; MAX_SPACES]),
designated_work: ArrayQueue::new(16),
stealer,
}
}
pub(crate) fn increase_live_bytes(
live_bytes_per_space: &mut [usize; MAX_SPACES],
object: ObjectReference,
) {
use crate::mmtk::VM_MAP;
use crate::vm::object_model::ObjectModel;
let bytes = VM::VMObjectModel::get_current_size(object);
let space_descriptor = VM_MAP.get_descriptor_for_address(object.to_raw_address());
if space_descriptor != crate::util::heap::space_descriptor::SpaceDescriptor::UNINITIALIZED {
let space_index = space_descriptor.get_index();
debug_assert!(
space_index < MAX_SPACES,
"Space index {} is not in the range of [0, {})",
space_index,
MAX_SPACES
);
live_bytes_per_space[space_index] += bytes;
}
}
}
pub struct GCWorker<VM: VMBinding> {
pub tls: VMWorkerThread,
pub ordinal: ThreadId,
scheduler: Arc<GCWorkScheduler<VM>>,
copy: GCWorkerCopyContext<VM>,
pub mmtk: &'static MMTK<VM>,
pub shared: Arc<GCWorkerShared<VM>>,
pub local_work_buffer: deque::Worker<Box<dyn GCWork<VM>>>,
}
unsafe impl<VM: VMBinding> Sync for GCWorkerShared<VM> {}
unsafe impl<VM: VMBinding> Send for GCWorkerShared<VM> {}
const STAT_BORROWED_MSG: &str = "GCWorkerShared.stat is already borrowed. This may happen if \
the mutator calls harness_begin or harness_end while the GC is running.";
impl<VM: VMBinding> GCWorkerShared<VM> {
pub fn borrow_stat(&self) -> AtomicRef<'_, WorkerLocalStat<VM>> {
self.stat.try_borrow().expect(STAT_BORROWED_MSG)
}
pub fn borrow_stat_mut(&self) -> AtomicRefMut<'_, WorkerLocalStat<VM>> {
self.stat.try_borrow_mut().expect(STAT_BORROWED_MSG)
}
}
#[derive(Debug)]
pub(crate) struct WorkerShouldExit;
pub(crate) type PollResult<VM> = Result<Box<dyn GCWork<VM>>, WorkerShouldExit>;
impl<VM: VMBinding> GCWorker<VM> {
pub(crate) fn new(
mmtk: &'static MMTK<VM>,
ordinal: ThreadId,
scheduler: Arc<GCWorkScheduler<VM>>,
shared: Arc<GCWorkerShared<VM>>,
local_work_buffer: deque::Worker<Box<dyn GCWork<VM>>>,
) -> Self {
Self {
tls: VMWorkerThread(VMThread::UNINITIALIZED),
ordinal,
copy: GCWorkerCopyContext::new_non_copy(),
scheduler,
mmtk,
shared,
local_work_buffer,
}
}
const LOCALLY_CACHED_WORK_PACKETS: usize = 16;
pub fn add_work_prioritized(&mut self, bucket: WorkBucketStage, work: impl GCWork<VM>) {
if !self.scheduler().work_buckets[bucket].is_open()
|| self.local_work_buffer.len() >= Self::LOCALLY_CACHED_WORK_PACKETS
{
self.scheduler.work_buckets[bucket].add_prioritized(Box::new(work));
return;
}
self.local_work_buffer.push(Box::new(work));
}
pub fn add_work(&mut self, bucket: WorkBucketStage, work: impl GCWork<VM>) {
if !self.scheduler().work_buckets[bucket].is_open()
|| self.local_work_buffer.len() >= Self::LOCALLY_CACHED_WORK_PACKETS
{
self.scheduler.work_buckets[bucket].add(work);
return;
}
self.local_work_buffer.push(Box::new(work));
}
pub fn scheduler(&self) -> &GCWorkScheduler<VM> {
&self.scheduler
}
pub fn get_copy_context_mut(&mut self) -> &mut GCWorkerCopyContext<VM> {
&mut self.copy
}
fn poll(&mut self) -> PollResult<VM> {
if let Some(work) = self.shared.designated_work.pop() {
return Ok(work);
}
if let Some(work) = self.local_work_buffer.pop() {
return Ok(work);
}
self.scheduler().poll(self)
}
pub fn run(mut self: Box<Self>, tls: VMWorkerThread, mmtk: &'static MMTK<VM>) {
probe!(mmtk, gcworker_run);
debug!(
"Worker started. ordinal: {}, {}",
self.ordinal,
crate::util::rust_util::debug_process_thread_id(),
);
WORKER_ORDINAL.with(|x| x.store(self.ordinal, Ordering::SeqCst));
self.scheduler.resolve_affinity(self.ordinal);
self.tls = tls;
self.copy = crate::plan::create_gc_worker_context(tls, mmtk);
loop {
probe!(mmtk, work_poll);
let Ok(mut work) = self.poll() else {
break;
};
#[allow(unused_variables)]
let typename = work.get_type_name();
#[cfg(feature = "bpftrace_workaround")]
std::hint::black_box(unsafe { *(typename.as_ptr()) });
probe!(mmtk, work, typename.as_ptr(), typename.len());
work.do_work_with_stat(&mut self, mmtk);
}
debug!(
"Worker exiting. ordinal: {}, {}",
self.ordinal,
crate::util::rust_util::debug_process_thread_id(),
);
probe!(mmtk, gcworker_exit);
mmtk.scheduler.surrender_gc_worker(self);
}
}
enum WorkerCreationState<VM: VMBinding> {
Initial {
local_work_queues: Vec<deque::Worker<Box<dyn GCWork<VM>>>>,
},
Spawned,
Surrendered {
#[allow(clippy::vec_box)]
workers: Vec<Box<GCWorker<VM>>>,
},
}
pub(crate) struct WorkerGroup<VM: VMBinding> {
pub workers_shared: Vec<Arc<GCWorkerShared<VM>>>,
state: Mutex<Option<WorkerCreationState<VM>>>,
}
unsafe impl<VM: VMBinding> Sync for WorkerGroup<VM> {}
impl<VM: VMBinding> WorkerGroup<VM> {
pub fn new(num_workers: usize) -> Arc<Self> {
let local_work_queues = (0..num_workers)
.map(|_| deque::Worker::new_fifo())
.collect::<Vec<_>>();
let workers_shared = (0..num_workers)
.map(|i| {
Arc::new(GCWorkerShared::<VM>::new(Some(
local_work_queues[i].stealer(),
)))
})
.collect::<Vec<_>>();
Arc::new(Self {
workers_shared,
state: Mutex::new(Some(WorkerCreationState::Initial { local_work_queues })),
})
}
pub fn initial_spawn(&self, tls: VMThread, mmtk: &'static MMTK<VM>) {
let mut state = self.state.lock().unwrap();
let WorkerCreationState::Initial { local_work_queues } = state.take().unwrap() else {
panic!("GCWorker structs have already been created");
};
let workers = self.create_workers(local_work_queues, mmtk);
self.spawn(workers, tls);
*state = Some(WorkerCreationState::Spawned);
}
pub fn respawn(&self, tls: VMThread) {
let mut state = self.state.lock().unwrap();
let WorkerCreationState::Surrendered { workers } = state.take().unwrap() else {
panic!("GCWorker structs have not been created, yet.");
};
self.spawn(workers, tls);
*state = Some(WorkerCreationState::Spawned)
}
#[allow(clippy::vec_box)] fn create_workers(
&self,
local_work_queues: Vec<deque::Worker<Box<dyn GCWork<VM>>>>,
mmtk: &'static MMTK<VM>,
) -> Vec<Box<GCWorker<VM>>> {
debug!("Creating GCWorker instances...");
assert_eq!(self.workers_shared.len(), local_work_queues.len());
let workers = (local_work_queues.into_iter())
.zip(self.workers_shared.iter())
.enumerate()
.map(|(ordinal, (queue, shared))| {
Box::new(GCWorker::new(
mmtk,
ordinal,
mmtk.scheduler.clone(),
shared.clone(),
queue,
))
})
.collect::<Vec<_>>();
debug!("Created {} GCWorker instances.", workers.len());
workers
}
#[allow(clippy::vec_box)] fn spawn(&self, workers: Vec<Box<GCWorker<VM>>>, tls: VMThread) {
debug!(
"Spawning GC workers. {}",
crate::util::rust_util::debug_process_thread_id(),
);
for worker in workers {
VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::<VM>::Worker(worker));
}
debug!(
"Spawned {} worker threads. {}",
self.worker_count(),
crate::util::rust_util::debug_process_thread_id(),
);
}
pub fn prepare_surrender_buffer(&self) {
let mut state = self.state.lock().unwrap();
assert!(matches!(*state, Some(WorkerCreationState::Spawned)));
*state = Some(WorkerCreationState::Surrendered {
workers: Vec::with_capacity(self.worker_count()),
})
}
pub fn surrender_gc_worker(&self, worker: Box<GCWorker<VM>>) -> bool {
let mut state = self.state.lock().unwrap();
let WorkerCreationState::Surrendered { ref mut workers } = state.as_mut().unwrap() else {
panic!("GCWorker structs have not been created, yet.");
};
let ordinal = worker.ordinal;
workers.push(worker);
trace!(
"Worker {} surrendered. ({}/{})",
ordinal,
workers.len(),
self.worker_count()
);
workers.len() == self.worker_count()
}
pub fn worker_count(&self) -> usize {
self.workers_shared.len()
}
pub fn has_designated_work(&self) -> bool {
self.workers_shared
.iter()
.any(|w| !w.designated_work.is_empty())
}
pub fn get_and_clear_worker_live_bytes(&self) -> [usize; MAX_SPACES] {
let mut ret = [0; MAX_SPACES];
self.workers_shared.iter().for_each(|w| {
let mut live_bytes_per_space = w.live_bytes_per_space.borrow_mut();
for (idx, val) in live_bytes_per_space.iter_mut().enumerate() {
ret[idx] += *val;
*val = 0;
}
});
ret
}
}