use crate::scheduler::{BatchExecutionResult, SchedulerError};
use std::collections::{HashMap, HashSet};
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use super::shared::MultiSchedulerShared;
pub enum MultiSchedulerCoreMessage {
BatchResult(usize, Option<BatchExecutionResult>),
SubSchedulerError(usize, SchedulerError),
}
pub struct MultiSchedulerCore {
shared_lock: Arc<Mutex<MultiSchedulerShared>>,
rx: Receiver<MultiSchedulerCoreMessage>,
done_schedulers: HashSet<usize>,
}
impl MultiSchedulerCore {
pub fn new(
shared_lock: Arc<Mutex<MultiSchedulerShared>>,
rx: Receiver<MultiSchedulerCoreMessage>,
) -> Self {
MultiSchedulerCore {
shared_lock,
rx,
done_schedulers: HashSet::new(),
}
}
fn run(&mut self) -> Result<(), SchedulerError> {
loop {
match self.rx.recv() {
Ok(MultiSchedulerCoreMessage::BatchResult(scheduler_index, batch_result)) => {
let mut shared = self.shared_lock.lock()?;
let num_schedulers = shared.schedulers().len();
if self.done_schedulers.contains(&scheduler_index) {
shared.error_callback()(SchedulerError::Internal(format!(
"got callback from sub-scheduler {}, which is already done",
scheduler_index,
)));
continue;
}
let batch_result = match batch_result {
Some(res) => res,
None => {
self.done_schedulers.insert(scheduler_index);
if self.done_schedulers.len() == num_schedulers {
if !shared.pending_results().is_empty() {
shared.error_callback()(SchedulerError::Internal(format!(
"all sub-schedulers are done, but some results not \
returned: {:?}",
shared.pending_results(),
)));
}
shared.result_callback()(None);
break;
}
continue;
}
};
let pending_results = shared.pending_results_mut();
let batch_done = match pending_results.get_mut(&batch_result.batch) {
Some(result_list) => {
result_list.insert(scheduler_index, batch_result.clone());
result_list.len() == num_schedulers
}
None => {
shared.error_callback()(SchedulerError::Internal(format!(
"got callback from sub-scheduler {} for batch ({}) that's not \
pending",
scheduler_index,
batch_result.batch.batch().header_signature(),
)));
continue;
}
};
if batch_done {
let mut results = pending_results
.remove(&batch_result.batch)
.unwrap()
.drain()
.fold(
HashMap::new(),
|mut acc: HashMap<BatchExecutionResult, HashSet<usize>>,
(scheduler, result)| {
match acc.get_mut(&result) {
Some(schedulers) => {
schedulers.insert(scheduler);
}
None => {
let mut schedulers = HashSet::new();
schedulers.insert(scheduler);
acc.insert(result, schedulers);
}
}
acc
},
);
if results.len() == 1 {
let (result, _) = results.drain().next().unwrap();
shared.result_callback()(Some(result));
} else {
shared.error_callback()(SchedulerError::Internal(format!(
"mismatched results for batch {}: {:?}",
batch_result.batch.batch().header_signature(),
results,
)));
}
}
}
Ok(MultiSchedulerCoreMessage::SubSchedulerError(scheduler_index, err)) => {
self.shared_lock.lock()?.error_callback()(SchedulerError::Internal(format!(
"scheduler {} encountered error: {}",
scheduler_index, err,
)));
}
Err(err) => {
warn!("Thread-MultiScheduler recv error: {}", err);
break;
}
}
}
Ok(())
}
fn send_scheduler_error(&self, error: SchedulerError) -> Result<(), SchedulerError> {
self.shared_lock.lock()?.error_callback()(error);
Ok(())
}
pub fn start(mut self) -> Result<std::thread::JoinHandle<()>, SchedulerError> {
std::thread::Builder::new()
.name(String::from("Thread-MultiScheduler"))
.spawn(move || {
if let Err(err) = self.run() {
let error = SchedulerError::Internal(format!(
"serial scheduler's internal thread ended due to error: {}",
err
));
self.send_scheduler_error(error.clone())
.unwrap_or_else(|_| error!("{}", error));
}
})
.map_err(|err| {
SchedulerError::Internal(format!(
"could not build a thread for the scheduler: {}",
err
))
})
}
}