mmtk 0.16.0

MMTk is a framework for the design and implementation of high-performance and portable memory managers.
Documentation
//! The GC controller thread.
//!
//! MMTk has many GC threads.  There are many GC worker threads and one GC controller thread.
//! The GC controller thread responds to GC requests and coordinates the workers to perform GC.

use std::sync::mpsc::Receiver;
use std::sync::Arc;

use crate::plan::gc_requester::GCRequester;
use crate::scheduler::gc_work::{EndOfGC, ScheduleCollection};
use crate::scheduler::CoordinatorMessage;
use crate::util::VMWorkerThread;
use crate::vm::VMBinding;
use crate::MMTK;
use atomic::Ordering;

use super::{GCWork, GCWorkScheduler, GCWorker};

/// The thread local struct for the GC controller, the counterpart of `GCWorker`.
pub struct GCController<VM: VMBinding> {
    /// The reference to the MMTk instance.
    mmtk: &'static MMTK<VM>,
    /// The reference to the GC requester.
    requester: Arc<GCRequester<VM>>,
    /// The reference to the scheduler.
    scheduler: Arc<GCWorkScheduler<VM>>,
    /// The receiving end of the channel to get controller/coordinator message from workers.
    receiver: Receiver<CoordinatorMessage<VM>>,
    /// The `GCWorker` is used to execute packets. The controller is also a `GCWorker`.
    coordinator_worker: GCWorker<VM>,
}

impl<VM: VMBinding> GCController<VM> {
    pub fn new(
        mmtk: &'static MMTK<VM>,
        requester: Arc<GCRequester<VM>>,
        scheduler: Arc<GCWorkScheduler<VM>>,
        receiver: Receiver<CoordinatorMessage<VM>>,
        coordinator_worker: GCWorker<VM>,
    ) -> Box<GCController<VM>> {
        Box::new(Self {
            mmtk,
            requester,
            scheduler,
            receiver,
            coordinator_worker,
        })
    }

    pub fn run(&mut self, tls: VMWorkerThread) {
        // Initialize the GC worker for coordinator. We are not using the run() method from
        // GCWorker so we manually initialize the worker here.
        self.coordinator_worker.tls = tls;

        loop {
            debug!("[STWController: Waiting for request...]");
            self.requester.wait_for_request();
            debug!("[STWController: Request recieved.]");

            // For heap growth logic
            // FIXME: This is not used. However, we probably want to set a 'user_triggered' flag
            // when GC is requested.
            // let user_triggered_collection: bool = SelectedPlan::is_user_triggered_collection();

            self.do_gc_until_completion();
            debug!("[STWController: Worker threads complete!]");
        }
    }

    /// Process a message. Return true if the GC is finished.
    fn process_message(&mut self, message: CoordinatorMessage<VM>) -> bool {
        let worker = &mut self.coordinator_worker;
        let mmtk = self.mmtk;
        match message {
            CoordinatorMessage::Work(mut work) => {
                work.do_work_with_stat(worker, mmtk);
                let old_count = self
                    .scheduler
                    .pending_coordinator_packets
                    .fetch_sub(1, Ordering::SeqCst);
                if old_count == 1 {
                    // When the coordinator finishes executing all coordinator work packets,
                    // it is a chance to open more work buckets.
                    // Notify one worker so it can open buckets.
                    let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
                    self.scheduler.worker_monitor.1.notify_one();
                }
                false
            }
            CoordinatorMessage::Finish => {
                // Quit only if all the buckets are empty.
                // For concurrent GCs, the coordinator thread may receive this message when
                // some buckets are still not empty. Under such case, the coordinator
                // should ignore the message.
                let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
                self.scheduler.worker_group.all_parked() && self.scheduler.all_buckets_empty()
            }
        }
    }

    /// Coordinate workers to perform GC in response to a GC request.
    pub fn do_gc_until_completion(&mut self) {
        // Schedule collection.
        ScheduleCollection.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);

        // Drain the message queue and execute coordinator work.
        loop {
            let message = self.receiver.recv().unwrap();
            let finished = self.process_message(message);
            if finished {
                break;
            }
        }
        debug_assert!(!self.scheduler.worker_group.has_designated_work());
        // Sometimes multiple finish messages will be sent. Skip them.
        for message in self.receiver.try_iter() {
            match message {
                CoordinatorMessage::Work(_) => unreachable!(),
                CoordinatorMessage::Finish => {}
            }
        }
        self.scheduler.deactivate_all();
        // Finalization: Resume mutators, reset gc states
        // Note: Resume-mutators must happen after all work buckets are closed.
        //       Otherwise, for generational GCs, workers will receive and process
        //       newly generated remembered-sets from those open buckets.
        //       But these remsets should be preserved until next GC.
        EndOfGC.do_work_with_stat(&mut self.coordinator_worker, self.mmtk);

        self.scheduler.debug_assert_all_buckets_deactivated();
    }
}