use super::cell::ACell;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex, RwLock};
use std::{thread, time};
use uuid::Uuid;
pub(crate) struct Scheduler {
id: usize,
cells: RwLock<Vec<Arc<ACell>>>,
actor_starts: Mutex<VecDeque<Arc<ACell>>>,
actor_stops: Mutex<VecDeque<Arc<ACell>>>,
}
impl Scheduler {
pub(crate) fn new(id: usize) -> Scheduler {
Scheduler {
id,
cells: RwLock::new(Vec::new()),
actor_starts: Mutex::new(VecDeque::new()),
actor_stops: Mutex::new(VecDeque::new()),
}
}
pub(crate) fn register_new_cell(&self, cell: Arc<ACell>) {
let mut cells = self.cells.write().unwrap();
cells.push(cell.clone());
let mut starts = self.actor_starts.lock().unwrap();
starts.push_back(cell);
}
pub(crate) fn start(&self) {
let base_backoff_us = 2;
let max_backoff_us = 1_000_000;
let exp = 2;
let mut backoff_us = base_backoff_us;
loop {
trace!("[Tick] Scheduler {}", self.id);
let mut zero_work_loop = true;
{
{
let mut starts = self.actor_starts.lock().unwrap();
if let Some(actor) = starts.pop_front() {
actor.start();
zero_work_loop = false;
}
}
{
let mut stops = self.actor_stops.lock().unwrap();
let mut cells = self.cells.write().unwrap();
if let Some(actor) = stops.pop_front() {
actor.shutdown();
let cell_lookup = cells.iter().enumerate()
.find(|(_, c)| c.uuid() == actor.uuid())
.map(|(i, c)| (i, c.uuid()));
if let Some((id, uuid)) = cell_lookup {
cells.remove(id);
debug!("Removed cell from scheduler: {}", uuid);
}
zero_work_loop = false;
}
}
{
let cells = self.cells.read().unwrap();
cells.iter().for_each(|cell| {
if cell.process() {
zero_work_loop = false;
}
});
}
}
if zero_work_loop {
thread::sleep(time::Duration::from_micros(backoff_us));
trace!("Sleeping scheduler {} for {}us", self.id, backoff_us);
backoff_us = (backoff_us * exp).min(max_backoff_us);
} else {
backoff_us = base_backoff_us;
}
}
}
pub(crate) fn stop_actor(&self, uuid: Uuid) {
let cells = self.cells.read().unwrap();
let cell = cells.iter().find(|c| c.uuid() == uuid);
cell.map(|c| self.actor_stops.lock().unwrap().push_back(c.clone()));
}
}