use crate::protocol::batch::BatchPair;
use crate::scheduler::{
default_error_callback, default_result_callback, BatchExecutionResult, Scheduler,
SchedulerError,
};
use std::collections::HashMap;
pub struct MultiSchedulerShared {
finalized: bool,
result_callback: Box<dyn Fn(Option<BatchExecutionResult>) + Send>,
error_callback: Box<dyn Fn(SchedulerError) + Send>,
pending_results: HashMap<BatchPair, HashMap<usize, BatchExecutionResult>>,
schedulers: Vec<Box<dyn Scheduler>>,
}
impl MultiSchedulerShared {
pub fn new(schedulers: Vec<Box<dyn Scheduler>>) -> Self {
MultiSchedulerShared {
finalized: false,
result_callback: Box::new(default_result_callback),
error_callback: Box::new(default_error_callback),
pending_results: HashMap::new(),
schedulers,
}
}
pub fn finalized(&self) -> bool {
self.finalized
}
pub fn result_callback(&self) -> &(dyn Fn(Option<BatchExecutionResult>) + Send) {
&*self.result_callback
}
pub fn error_callback(&self) -> &(dyn Fn(SchedulerError) + Send) {
&*self.error_callback
}
pub fn set_result_callback(
&mut self,
callback: Box<dyn Fn(Option<BatchExecutionResult>) + Send>,
) {
self.result_callback = callback;
}
pub fn set_error_callback(&mut self, callback: Box<dyn Fn(SchedulerError) + Send>) {
self.error_callback = callback;
}
pub fn batch_already_pending(&self, batch: &BatchPair) -> bool {
self.pending_results.contains_key(batch)
}
pub fn add_batch(&mut self, batch: BatchPair) -> Result<(), SchedulerError> {
for (i, scheduler) in self.schedulers.iter_mut().enumerate() {
scheduler.add_batch(batch.clone()).map_err(|err| {
SchedulerError::Internal(format!(
"failed to add batch to sub-scheduler {}: {}",
i, err
))
})?;
}
self.pending_results.insert(batch, HashMap::new());
Ok(())
}
pub fn cancel(&mut self) -> Result<Vec<BatchPair>, SchedulerError> {
for (i, scheduler) in self.schedulers.iter_mut().enumerate() {
scheduler.cancel().map_err(|err| {
SchedulerError::Internal(format!("failed to cancel sub-scheduler {}: {}", i, err))
})?;
}
Ok(self
.pending_results
.drain()
.map(|(batch, _)| batch)
.collect())
}
pub fn finalize(&mut self) -> Result<(), SchedulerError> {
for (i, scheduler) in self.schedulers.iter_mut().enumerate() {
scheduler.finalize().map_err(|err| {
SchedulerError::Internal(format!("failed to finalize sub-scheduler {}: {}", i, err))
})?;
}
self.finalized = true;
Ok(())
}
pub fn pending_results(&self) -> &HashMap<BatchPair, HashMap<usize, BatchExecutionResult>> {
&self.pending_results
}
pub fn pending_results_mut(
&mut self,
) -> &mut HashMap<BatchPair, HashMap<usize, BatchExecutionResult>> {
&mut self.pending_results
}
pub fn schedulers(&self) -> &Vec<Box<dyn Scheduler>> {
&self.schedulers
}
pub fn schedulers_mut(&mut self) -> &mut Vec<Box<dyn Scheduler>> {
&mut self.schedulers
}
}