use super::stat::WorkerLocalStat;
use super::work_bucket::*;
use super::*;
use crate::mmtk::MMTK;
use crate::util::copy::GCWorkerCopyContext;
use crate::util::opaque_pointer::*;
use crate::vm::{Collection, GCThreadContext, VMBinding};
use atomic::Atomic;
use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut};
use crossbeam::deque::{self, Stealer};
use crossbeam::queue::ArrayQueue;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
pub type ThreadId = usize;
thread_local! {
static WORKER_ORDINAL: Atomic<Option<ThreadId>> = Atomic::new(None);
}
pub fn current_worker_ordinal() -> Option<ThreadId> {
WORKER_ORDINAL.with(|x| x.load(Ordering::Relaxed))
}
pub struct GCWorkerShared<VM: VMBinding> {
stat: AtomicRefCell<WorkerLocalStat<VM>>,
pub designated_work: ArrayQueue<Box<dyn GCWork<VM>>>,
pub stealer: Option<Stealer<Box<dyn GCWork<VM>>>>,
}
impl<VM: VMBinding> GCWorkerShared<VM> {
pub fn new(stealer: Option<Stealer<Box<dyn GCWork<VM>>>>) -> Self {
Self {
stat: Default::default(),
designated_work: ArrayQueue::new(16),
stealer,
}
}
}
pub struct GCWorker<VM: VMBinding> {
pub tls: VMWorkerThread,
pub ordinal: ThreadId,
scheduler: Arc<GCWorkScheduler<VM>>,
copy: GCWorkerCopyContext<VM>,
pub sender: Sender<CoordinatorMessage<VM>>,
pub mmtk: &'static MMTK<VM>,
is_coordinator: bool,
pub shared: Arc<GCWorkerShared<VM>>,
pub local_work_buffer: deque::Worker<Box<dyn GCWork<VM>>>,
}
unsafe impl<VM: VMBinding> Sync for GCWorkerShared<VM> {}
unsafe impl<VM: VMBinding> Send for GCWorkerShared<VM> {}
const STAT_BORROWED_MSG: &str = "GCWorkerShared.stat is already borrowed. This may happen if \
the mutator calls harness_begin or harness_end while the GC is running.";
impl<VM: VMBinding> GCWorkerShared<VM> {
pub fn borrow_stat(&self) -> AtomicRef<WorkerLocalStat<VM>> {
self.stat.try_borrow().expect(STAT_BORROWED_MSG)
}
pub fn borrow_stat_mut(&self) -> AtomicRefMut<WorkerLocalStat<VM>> {
self.stat.try_borrow_mut().expect(STAT_BORROWED_MSG)
}
}
impl<VM: VMBinding> GCWorker<VM> {
pub fn new(
mmtk: &'static MMTK<VM>,
ordinal: ThreadId,
scheduler: Arc<GCWorkScheduler<VM>>,
is_coordinator: bool,
sender: Sender<CoordinatorMessage<VM>>,
shared: Arc<GCWorkerShared<VM>>,
local_work_buffer: deque::Worker<Box<dyn GCWork<VM>>>,
) -> Self {
Self {
tls: VMWorkerThread(VMThread::UNINITIALIZED),
ordinal,
copy: GCWorkerCopyContext::new_non_copy(),
sender,
scheduler,
mmtk,
is_coordinator,
shared,
local_work_buffer,
}
}
const LOCALLY_CACHED_WORK_PACKETS: usize = 16;
pub fn add_work_prioritized(&mut self, bucket: WorkBucketStage, work: impl GCWork<VM>) {
if !self.scheduler().work_buckets[bucket].is_activated()
|| self.local_work_buffer.len() >= Self::LOCALLY_CACHED_WORK_PACKETS
{
self.scheduler.work_buckets[bucket].add_prioritized(Box::new(work));
return;
}
self.local_work_buffer.push(Box::new(work));
}
pub fn add_work(&mut self, bucket: WorkBucketStage, work: impl GCWork<VM>) {
if !self.scheduler().work_buckets[bucket].is_activated()
|| self.local_work_buffer.len() >= Self::LOCALLY_CACHED_WORK_PACKETS
{
self.scheduler.work_buckets[bucket].add(work);
return;
}
self.local_work_buffer.push(Box::new(work));
}
pub fn is_coordinator(&self) -> bool {
self.is_coordinator
}
pub fn scheduler(&self) -> &GCWorkScheduler<VM> {
&self.scheduler
}
pub fn get_copy_context_mut(&mut self) -> &mut GCWorkerCopyContext<VM> {
&mut self.copy
}
pub fn do_work(&'static mut self, mut work: impl GCWork<VM>) {
work.do_work(self, self.mmtk);
}
fn poll(&self) -> Box<dyn GCWork<VM>> {
self.shared
.designated_work
.pop()
.or_else(|| self.local_work_buffer.pop())
.unwrap_or_else(|| self.scheduler().poll(self))
}
pub fn do_boxed_work(&'static mut self, mut work: Box<dyn GCWork<VM>>) {
work.do_work(self, self.mmtk);
}
pub fn run(&mut self, tls: VMWorkerThread, mmtk: &'static MMTK<VM>) {
WORKER_ORDINAL.with(|x| x.store(Some(self.ordinal), Ordering::SeqCst));
self.scheduler.resolve_affinity(self.ordinal);
self.tls = tls;
self.copy = crate::plan::create_gc_worker_context(tls, mmtk);
loop {
let mut work = self.poll();
work.do_work_with_stat(self, mmtk);
}
}
}
pub struct WorkerGroup<VM: VMBinding> {
pub workers_shared: Vec<Arc<GCWorkerShared<VM>>>,
parked_workers: AtomicUsize,
unspawned_local_work_queues: Mutex<Vec<deque::Worker<Box<dyn GCWork<VM>>>>>,
}
impl<VM: VMBinding> WorkerGroup<VM> {
pub fn new(num_workers: usize) -> Arc<Self> {
let unspawned_local_work_queues = (0..num_workers)
.map(|_| deque::Worker::new_fifo())
.collect::<Vec<_>>();
let workers_shared = (0..num_workers)
.map(|i| {
Arc::new(GCWorkerShared::<VM>::new(Some(
unspawned_local_work_queues[i].stealer(),
)))
})
.collect::<Vec<_>>();
Arc::new(Self {
workers_shared,
parked_workers: Default::default(),
unspawned_local_work_queues: Mutex::new(unspawned_local_work_queues),
})
}
pub fn spawn(
&self,
mmtk: &'static MMTK<VM>,
sender: Sender<CoordinatorMessage<VM>>,
tls: VMThread,
) {
let mut unspawned_local_work_queues = self.unspawned_local_work_queues.lock().unwrap();
for (ordinal, shared) in self.workers_shared.iter().enumerate() {
let worker = Box::new(GCWorker::new(
mmtk,
ordinal,
mmtk.scheduler.clone(),
false,
sender.clone(),
shared.clone(),
unspawned_local_work_queues.pop().unwrap(),
));
VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::<VM>::Worker(worker));
}
debug_assert!(unspawned_local_work_queues.is_empty());
}
pub fn worker_count(&self) -> usize {
self.workers_shared.len()
}
pub fn inc_parked_workers(&self) -> bool {
let old = self.parked_workers.fetch_add(1, Ordering::SeqCst);
debug_assert!(old < self.worker_count());
old + 1 == self.worker_count()
}
pub fn dec_parked_workers(&self) {
let old = self.parked_workers.fetch_sub(1, Ordering::SeqCst);
debug_assert!(old <= self.worker_count());
}
pub fn parked_workers(&self) -> usize {
self.parked_workers.load(Ordering::SeqCst)
}
pub fn all_parked(&self) -> bool {
self.parked_workers() == self.worker_count()
}
pub fn has_designated_work(&self) -> bool {
self.workers_shared
.iter()
.any(|w| !w.designated_work.is_empty())
}
}
pub(crate) struct ParkingGuard<'a, VM: VMBinding> {
worker_group: &'a WorkerGroup<VM>,
all_parked: bool,
}
impl<'a, VM: VMBinding> ParkingGuard<'a, VM> {
pub fn new(worker_group: &'a WorkerGroup<VM>) -> Self {
let all_parked = worker_group.inc_parked_workers();
ParkingGuard {
worker_group,
all_parked,
}
}
pub fn all_parked(&self) -> bool {
self.all_parked
}
}
impl<'a, VM: VMBinding> Drop for ParkingGuard<'a, VM> {
fn drop(&mut self) {
self.worker_group.dec_parked_workers();
}
}