mod core;
mod execution;
mod shared;
use crate::context::ContextLifecycle;
use crate::protocol::batch::BatchPair;
use std::sync::mpsc;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use super::{
BatchExecutionResult, ExecutionTask, ExecutionTaskCompletionNotifier, Scheduler,
SchedulerError, SchedulerFactory,
};
impl From<std::sync::PoisonError<std::sync::MutexGuard<'_, shared::Shared>>> for SchedulerError {
fn from(
error: std::sync::PoisonError<std::sync::MutexGuard<'_, shared::Shared>>,
) -> SchedulerError {
SchedulerError::Internal(format!("scheduler shared lock is poisoned: {}", error))
}
}
impl From<std::sync::mpsc::SendError<core::CoreMessage>> for SchedulerError {
fn from(error: std::sync::mpsc::SendError<core::CoreMessage>) -> SchedulerError {
SchedulerError::Internal(format!("scheduler's core thread disconnected: {}", error))
}
}
pub struct SerialScheduler {
shared_lock: Arc<Mutex<shared::Shared>>,
core_tx: Sender<core::CoreMessage>,
task_iterator: Option<Box<dyn Iterator<Item = ExecutionTask> + Send>>,
}
impl SerialScheduler {
pub fn new(
context_lifecycle: Box<dyn ContextLifecycle>,
state_id: String,
) -> Result<SerialScheduler, SchedulerError> {
let (execution_tx, execution_rx) = mpsc::channel();
let (core_tx, core_rx) = mpsc::channel();
let shared_lock = Arc::new(Mutex::new(shared::Shared::new()));
core::SchedulerCore::new(
shared_lock.clone(),
core_rx,
execution_tx,
context_lifecycle,
state_id,
)
.start()?;
Ok(SerialScheduler {
shared_lock,
core_tx: core_tx.clone(),
task_iterator: Some(Box::new(execution::SerialExecutionTaskIterator::new(
core_tx,
execution_rx,
))),
})
}
}
impl Scheduler for SerialScheduler {
fn set_result_callback(
&mut self,
callback: Box<dyn Fn(Option<BatchExecutionResult>) + Send>,
) -> Result<(), SchedulerError> {
self.shared_lock.lock()?.set_result_callback(callback);
Ok(())
}
fn set_error_callback(
&mut self,
callback: Box<dyn Fn(SchedulerError) + Send>,
) -> Result<(), SchedulerError> {
self.shared_lock.lock()?.set_error_callback(callback);
Ok(())
}
fn add_batch(&mut self, batch: BatchPair) -> Result<(), SchedulerError> {
let mut shared = self.shared_lock.lock()?;
if shared.finalized() {
return Err(SchedulerError::SchedulerFinalized);
}
if shared.batch_already_queued(&batch) {
return Err(SchedulerError::DuplicateBatch(
batch.batch().header_signature().into(),
));
}
shared.add_unscheduled_batch(batch);
self.core_tx.send(core::CoreMessage::BatchAdded)?;
Ok(())
}
fn cancel(&mut self) -> Result<Vec<BatchPair>, SchedulerError> {
let mut unscheduled_batches = self.shared_lock.lock()?.drain_unscheduled_batches();
let (sender, receiver) = mpsc::channel();
if self
.core_tx
.send(core::CoreMessage::Cancelled(sender))
.is_err()
&& !self.shared_lock.lock()?.finalized()
{
return Err(SchedulerError::Internal(
"scheduler's core thread disconnected before finalization".into(),
));
}
let aborted_batch = match receiver.recv() {
Ok(batch) => batch,
Err(_) => {
if self.shared_lock.lock()?.finalized() {
None
} else {
return Err(SchedulerError::Internal(
"scheduler's core thread did not return aborted batch".into(),
));
}
}
};
if let Some(batch) = aborted_batch {
let mut new_list = vec![batch];
new_list.append(&mut unscheduled_batches);
unscheduled_batches = new_list;
}
Ok(unscheduled_batches)
}
fn finalize(&mut self) -> Result<(), SchedulerError> {
self.shared_lock.lock()?.set_finalized(true);
self.core_tx.send(core::CoreMessage::Finalized)?;
Ok(())
}
fn take_task_iterator(
&mut self,
) -> Result<Box<dyn Iterator<Item = ExecutionTask> + Send>, SchedulerError> {
self.task_iterator
.take()
.ok_or(SchedulerError::NoTaskIterator)
}
fn new_notifier(&mut self) -> Result<Box<dyn ExecutionTaskCompletionNotifier>, SchedulerError> {
Ok(Box::new(
execution::SerialExecutionTaskCompletionNotifier::new(self.core_tx.clone()),
))
}
}
pub struct SerialSchedulerFactory {
context_lifecycle: Box<dyn ContextLifecycle>,
}
impl SerialSchedulerFactory {
pub fn new(context_lifecycle: Box<dyn ContextLifecycle>) -> Self {
Self { context_lifecycle }
}
}
impl SchedulerFactory for SerialSchedulerFactory {
fn create_scheduler(&self, state_id: String) -> Result<Box<dyn Scheduler>, SchedulerError> {
SerialScheduler::new(self.context_lifecycle.clone(), state_id)
.map(|scheduler| Box::new(scheduler) as Box<dyn Scheduler>)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::scheduler::tests::*;
use crate::scheduler::ExecutionTaskCompletionNotification;
#[test]
fn test_serial_scheduler_add_batch() {
let state_id = String::from("state0");
let context_lifecycle = Box::new(MockContextLifecycle::new());
let mut scheduler =
SerialScheduler::new(context_lifecycle, state_id).expect("Failed to create scheduler");
let batch = test_scheduler_add_batch(&mut scheduler);
assert!(scheduler
.shared_lock
.lock()
.expect("shared lock is poisoned")
.batch_already_queued(&batch));
scheduler.cancel().expect("Failed to cancel");
scheduler.finalize().expect("Failed to finalize");
}
#[test]
fn test_serial_scheduler_cancel() {
let state_id = String::from("state0");
let context_lifecycle = Box::new(MockContextLifecycle::new());
let mut scheduler =
SerialScheduler::new(context_lifecycle, state_id).expect("Failed to create scheduler");
test_scheduler_cancel(&mut scheduler);
assert!(scheduler
.shared_lock
.lock()
.expect("shared lock is poisoned")
.unscheduled_batches_is_empty());
scheduler.cancel().expect("Failed to cancel");
scheduler.finalize().expect("Failed to finalize");
}
#[test]
fn test_serial_scheduler_finalize() {
let state_id = String::from("state0");
let context_lifecycle = Box::new(MockContextLifecycle::new());
let mut scheduler =
SerialScheduler::new(context_lifecycle, state_id).expect("Failed to create scheduler");
test_scheduler_finalize(&mut scheduler);
assert!(scheduler
.shared_lock
.lock()
.expect("shared lock is poisoned")
.finalized());
}
#[test]
pub fn test_serial_scheduler_flow_with_one_transaction() {
let state_id = String::from("state0");
let context_lifecycle = Box::new(MockContextLifecycle::new());
let mut scheduler =
SerialScheduler::new(context_lifecycle, state_id).expect("Failed to create scheduler");
test_scheduler_flow_with_one_transaction(&mut scheduler);
scheduler.cancel().expect("Failed to cancel");
scheduler.finalize().expect("Failed to finalize");
}
#[test]
pub fn test_serial_scheduler_flow_with_two_batches() {
let state_id = String::from("state0");
let context_lifecycle = Box::new(MockContextLifecycle::new());
let mut scheduler =
SerialScheduler::new(context_lifecycle, state_id).expect("Failed to create scheduler");
test_scheduler_flow_with_two_batches(&mut scheduler);
}
#[test]
pub fn test_serial_scheduler_flow_with_multiple_transactions() {
let state_id = String::from("state0");
let context_lifecycle = Box::new(MockContextLifecycle::new());
let mut scheduler =
SerialScheduler::new(context_lifecycle, state_id).expect("Failed to create scheduler");
test_scheduler_flow_with_multiple_transactions(&mut scheduler);
scheduler.cancel().expect("Failed to cancel");
scheduler.finalize().expect("Failed to finalize");
}
#[test]
pub fn test_serial_scheduler_invalid_transaction_invalidates_batch() {
let state_id = String::from("state0");
let context_lifecycle = Box::new(MockContextLifecycle::new());
let mut scheduler =
SerialScheduler::new(context_lifecycle, state_id).expect("Failed to create scheduler");
test_scheduler_invalid_transaction_invalidates_batch(&mut scheduler);
scheduler.cancel().expect("Failed to cancel");
scheduler.finalize().expect("Failed to finalize");
}
#[test]
fn finalize_then_cancel() {
let state_id = String::from("state0");
let context_lifecycle = Box::new(MockContextLifecycle::new());
let mut scheduler =
SerialScheduler::new(context_lifecycle, state_id).expect("Failed to create scheduler");
scheduler.finalize().expect("Failed to finalize");
scheduler.cancel().expect("Failed to cancel");
}
#[test]
fn finalize_then_cancel_after_wait() {
let state_id = String::from("state0");
let context_lifecycle = Box::new(MockContextLifecycle::new());
let mut scheduler =
SerialScheduler::new(context_lifecycle, state_id).expect("Failed to create scheduler");
scheduler.finalize().expect("Failed to finalize");
std::thread::sleep(std::time::Duration::from_secs(1));
scheduler.cancel().expect("Failed to cancel");
}
#[test]
fn test_serial_scheduler_ordering() {
let state_id = String::from("state0");
let context_lifecycle = Box::new(MockContextLifecycle::new());
let mut scheduler =
SerialScheduler::new(context_lifecycle, state_id).expect("Failed to create scheduler");
let transactions = mock_transactions(10);
let batch = mock_batch(transactions.clone());
scheduler
.add_batch(batch.clone())
.expect("Failed to add batch");
scheduler.finalize().expect("Failed to finalize");
let mut task_iterator = scheduler
.take_task_iterator()
.expect("Failed to get task iterator");
let notifier = scheduler
.new_notifier()
.expect("Failed to get new notifier");
let mut transaction_ids = transactions.into_iter();
let (tx, rx) = mpsc::channel();
let first_task_notifier = notifier.clone();
let first_task_txn_id = task_iterator
.next()
.expect("Failed to get 1st task")
.pair()
.transaction()
.header_signature()
.to_string();
assert_eq!(
transaction_ids
.next()
.expect("Failed to get next transaction")
.header_signature(),
&first_task_txn_id,
);
std::thread::Builder::new()
.name("Thread-test_serial_scheduler_ordering".into())
.spawn(move || {
std::thread::sleep(std::time::Duration::from_secs(1));
tx.send(()).expect("Failed to send");
first_task_notifier.notify(ExecutionTaskCompletionNotification::Valid(
mock_context_id(),
first_task_txn_id,
));
})
.expect("Failed to spawn thread");
let second_task_txn_id = task_iterator
.next()
.expect("Failed to get 2nd task")
.pair()
.transaction()
.header_signature()
.to_string();
assert_eq!(
transaction_ids
.next()
.expect("Failed to get next transaction")
.header_signature(),
&second_task_txn_id,
);
rx.try_recv()
.expect("Returned next task before previous completed");
notifier.notify(ExecutionTaskCompletionNotification::Valid(
mock_context_id(),
second_task_txn_id,
));
loop {
match task_iterator.next() {
Some(task) => {
let txn_id = task.pair().transaction().header_signature().to_string();
assert_eq!(
transaction_ids
.next()
.expect("Failed to get next transaction")
.header_signature(),
&txn_id,
);
notifier.notify(ExecutionTaskCompletionNotification::Valid(
mock_context_id(),
txn_id,
));
}
None => break,
}
}
}
}