use rain_core::utils::*;
use std::clone::Clone;
use std::collections::hash_map::HashMap;
use server::graph::{DataObjectRef, GovernorRef, Graph, SessionRef, TaskRef, TaskState};
#[derive(Default, Clone, Debug)]
pub struct UpdatedOut {
pub(in super::super) tasks: RcSet<TaskRef>,
pub(in super::super) objects: HashMap<GovernorRef, RcSet<DataObjectRef>>,
}
#[derive(Default, Clone, Debug)]
pub struct UpdatedIn {
pub(in super::super) new_tasks: RcSet<TaskRef>,
pub(in super::super) new_objects: RcSet<DataObjectRef>,
pub(in super::super) tasks: RcSet<TaskRef>,
pub(in super::super) objects: HashMap<DataObjectRef, RcSet<GovernorRef>>,
}
impl UpdatedIn {
pub fn is_empty(&self) -> bool {
self.tasks.is_empty() && self.objects.is_empty() && self.new_tasks.is_empty()
&& self.new_objects.is_empty()
}
pub fn clear(&mut self) {
self.new_tasks = Default::default();
self.new_objects = Default::default();
self.tasks.clear();
self.objects.clear();
}
pub fn remove_task(&mut self, task_ref: &TaskRef) {
self.new_tasks.remove(task_ref);
self.tasks.remove(task_ref);
}
}
#[derive(Default, Clone, Debug)]
pub struct ReactiveScheduler {
ready_tasks: RcSet<TaskRef>,
}
impl ReactiveScheduler {
fn pick_best(&self, graph: &mut Graph) -> Option<(TaskRef, GovernorRef)> {
let mut best_governor = None;
let mut best_score = 0;
let mut best_task = None;
let n_governors = graph.governors.len() as i64;
for tref in &self.ready_tasks {
let t = tref.get();
let mut total_size = 0;
for input in &t.inputs {
let o = input.get();
total_size +=
o.info.size.expect("missing info.size in finished object") * o.scheduled.len();
}
let neg_avg_size = -(total_size as i64) / n_governors;
for (_, wref) in &graph.governors {
let w = wref.get();
let cpus = t.spec.resources.cpus();
if cpus + w.active_resources <= w.resources.cpus()
&& t.spec.resources.is_subset_of(&w.resources)
{
let mut score = neg_avg_size + cpus as i64 * 5000i64;
for input in &t.inputs {
let o = input.get();
if o.scheduled.contains(wref) {
score += o.info.size.unwrap() as i64;
}
}
if best_score < score || best_governor.is_none() {
best_score = score;
best_governor = Some(wref.clone());
best_task = Some(tref.clone());
}
}
}
}
if let Some(wref) = best_governor {
Some((best_task.unwrap(), wref))
} else {
None
}
}
pub fn clear_session(&mut self, session: &SessionRef) {
let s = session.get();
for tref in &s.tasks {
self.ready_tasks.remove(&tref);
}
}
pub fn schedule(&mut self, graph: &mut Graph, updated: &UpdatedIn) -> UpdatedOut {
let mut up_out: UpdatedOut = Default::default();
if graph.governors.is_empty() {
return up_out;
}
for tref in &updated.new_tasks {
let mut t = tref.get_mut();
if t.state == TaskState::Ready {
debug!("Scheduler: New ready task {}", t.id());
let r = self.ready_tasks.insert(tref.clone());
assert!(r);
}
}
for tref in &updated.tasks {
let mut t = tref.get_mut();
if t.state == TaskState::Ready {
debug!("Scheduler: New ready task {}", t.id());
let r = self.ready_tasks.insert(tref.clone());
assert!(r);
}
}
debug!("Scheduler started");
while let Some((tref, wref)) = self.pick_best(graph) {
{
let mut w = wref.get_mut();
let mut t = tref.get_mut();
assert!(t.state == TaskState::Ready);
w.active_resources += t.spec().resources.cpus();
w.scheduled_tasks.insert(tref.clone());
w.scheduled_ready_tasks.insert(tref.clone());
t.scheduled = Some(wref.clone());
debug!("Scheduler: {} -> {}", t.id(), w.id());
for oref in &t.outputs {
w.scheduled_objects.insert(oref.clone());
oref.get_mut().scheduled.insert(wref.clone());
up_out
.objects
.entry(wref.clone())
.or_insert(Default::default())
.insert(oref.clone());
}
}
self.ready_tasks.remove(&tref);
up_out.tasks.insert(tref);
}
up_out
}
}