use std::sync::mpsc::Receiver;
use std::sync::Arc;
use crate::plan::gc_requester::GCRequester;
use crate::scheduler::gc_work::{EndOfGC, ScheduleCollection};
use crate::scheduler::CoordinatorMessage;
use crate::util::VMWorkerThread;
use crate::vm::VMBinding;
use crate::MMTK;
use atomic::Ordering;
use super::{GCWork, GCWorkScheduler, GCWorker};
pub struct GCController<VM: VMBinding> {
mmtk: &'static MMTK<VM>,
requester: Arc<GCRequester<VM>>,
scheduler: Arc<GCWorkScheduler<VM>>,
receiver: Receiver<CoordinatorMessage<VM>>,
coordinator_worker: GCWorker<VM>,
}
impl<VM: VMBinding> GCController<VM> {
pub fn new(
mmtk: &'static MMTK<VM>,
requester: Arc<GCRequester<VM>>,
scheduler: Arc<GCWorkScheduler<VM>>,
receiver: Receiver<CoordinatorMessage<VM>>,
coordinator_worker: GCWorker<VM>,
) -> Box<GCController<VM>> {
Box::new(Self {
mmtk,
requester,
scheduler,
receiver,
coordinator_worker,
})
}
pub fn run(&mut self, tls: VMWorkerThread) {
self.coordinator_worker.tls = tls;
loop {
debug!("[STWController: Waiting for request...]");
self.requester.wait_for_request();
debug!("[STWController: Request recieved.]");
self.do_gc_until_completion();
debug!("[STWController: Worker threads complete!]");
}
}
fn process_message(&mut self, message: CoordinatorMessage<VM>) -> bool {
let worker = &mut self.coordinator_worker;
let mmtk = self.mmtk;
match message {
CoordinatorMessage::Work(mut work) => {
work.do_work_with_stat(worker, mmtk);
let old_count = self
.scheduler
.pending_coordinator_packets
.fetch_sub(1, Ordering::SeqCst);
if old_count == 1 {
let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
self.scheduler.worker_monitor.1.notify_one();
}
false
}
CoordinatorMessage::Finish => {
let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
self.scheduler.worker_group.all_parked() && self.scheduler.all_buckets_empty()
}
}
}
pub fn do_gc_until_completion(&mut self) {
ScheduleCollection.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);
loop {
let message = self.receiver.recv().unwrap();
let finished = self.process_message(message);
if finished {
break;
}
}
debug_assert!(!self.scheduler.worker_group.has_designated_work());
for message in self.receiver.try_iter() {
match message {
CoordinatorMessage::Work(_) => unreachable!(),
CoordinatorMessage::Finish => {}
}
}
self.scheduler.deactivate_all();
EndOfGC.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);
self.scheduler.debug_assert_all_buckets_deactivated();
}
}