pub mod multi;
pub mod parallel;
pub mod serial;
use std::error::Error;
use crate::context::ContextId;
use crate::protocol::batch::BatchPair;
use crate::protocol::receipt::TransactionReceipt;
use crate::protocol::transaction::TransactionPair;
pub struct ExecutionTask {
pair: TransactionPair,
context_id: ContextId,
}
impl ExecutionTask {
pub fn new(pair: TransactionPair, context_id: ContextId) -> Self {
ExecutionTask { pair, context_id }
}
pub fn pair(&self) -> &TransactionPair {
&self.pair
}
pub fn context_id(&self) -> &ContextId {
&self.context_id
}
pub fn take(self) -> (TransactionPair, ContextId) {
(self.pair, self.context_id)
}
}
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
pub struct InvalidTransactionResult {
pub transaction_id: String,
pub error_message: String,
pub error_data: Vec<u8>,
}
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
pub enum TransactionExecutionResult {
Invalid(InvalidTransactionResult),
Valid(TransactionReceipt),
}
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
pub struct BatchExecutionResult {
pub batch: BatchPair,
pub results: Vec<TransactionExecutionResult>,
}
#[derive(Debug, PartialEq)]
pub enum ExecutionTaskCompletionNotification {
Invalid(ContextId, InvalidTransactionResult),
Valid(ContextId, String),
}
#[derive(Clone, Debug)]
pub enum SchedulerError {
DuplicateBatch(String),
Internal(String),
NoTaskIterator,
SchedulerFinalized,
UnexpectedNotification(String),
}
impl Error for SchedulerError {}
impl std::fmt::Display for SchedulerError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match *self {
SchedulerError::DuplicateBatch(ref batch_id) => {
write!(f, "duplicate batch added to scheduler: {}", batch_id)
}
SchedulerError::Internal(ref err) => {
write!(f, "scheduler encountered an internal error: {}", err)
}
SchedulerError::NoTaskIterator => write!(f, "task iterator already taken"),
SchedulerError::SchedulerFinalized => write!(f, "batch added to finalized scheduler"),
SchedulerError::UnexpectedNotification(ref txn_id) => write!(
f,
"scheduler received an unexpected notification: {}",
txn_id
),
}
}
}
pub trait Scheduler {
fn set_result_callback(
&mut self,
callback: Box<Fn(Option<BatchExecutionResult>) + Send>,
) -> Result<(), SchedulerError>;
fn set_error_callback(
&mut self,
callback: Box<Fn(SchedulerError) + Send>,
) -> Result<(), SchedulerError>;
fn add_batch(&mut self, batch: BatchPair) -> Result<(), SchedulerError>;
fn cancel(&mut self) -> Result<Vec<BatchPair>, SchedulerError>;
fn finalize(&mut self) -> Result<(), SchedulerError>;
fn take_task_iterator(
&mut self,
) -> Result<Box<dyn Iterator<Item = ExecutionTask> + Send>, SchedulerError>;
fn new_notifier(&mut self) -> Result<Box<dyn ExecutionTaskCompletionNotifier>, SchedulerError>;
}
pub trait ExecutionTaskCompletionNotifier: Send {
fn notify(&self, notification: ExecutionTaskCompletionNotification);
fn clone_box(&self) -> Box<dyn ExecutionTaskCompletionNotifier>;
}
impl Clone for Box<ExecutionTaskCompletionNotifier> {
fn clone(&self) -> Self {
self.clone_box()
}
}
fn default_result_callback(batch_result: Option<BatchExecutionResult>) {
warn!(
"No result callback set; dropping batch execution result: {}",
match batch_result {
Some(ref result) => result.batch.batch().header_signature(),
None => "None",
}
);
}
fn default_error_callback(error: SchedulerError) {
error!("No error callback set; SchedulerError: {}", error);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::context::manager::ContextManagerError;
use crate::context::ContextLifecycle;
use crate::protocol::batch::BatchBuilder;
use crate::protocol::receipt::TransactionReceiptBuilder;
use crate::protocol::transaction::{HashMethod, Transaction, TransactionBuilder};
use crate::signing::hash::HashSigner;
use std::sync::mpsc;
pub fn mock_transactions(num: u8) -> Vec<Transaction> {
(0..num)
.map(|i| {
TransactionBuilder::new()
.with_family_name("mock".into())
.with_family_version("0.1".into())
.with_inputs(vec![])
.with_outputs(vec![])
.with_nonce(vec![i])
.with_payload(vec![])
.with_payload_hash_method(HashMethod::SHA512)
.build(&HashSigner::default())
.expect("Failed to build transaction")
})
.collect()
}
pub fn mock_batch(transactions: Vec<Transaction>) -> BatchPair {
BatchBuilder::new()
.with_transactions(transactions)
.build_pair(&HashSigner::default())
.expect("Failed to build batch pair")
}
pub fn mock_batch_with_num_txns(num: u8) -> BatchPair {
mock_batch(mock_transactions(num))
}
pub fn mock_batches_with_one_transaction(num_batches: u8) -> Vec<BatchPair> {
mock_transactions(num_batches)
.into_iter()
.map(|txn| mock_batch(vec![txn]))
.collect()
}
pub fn valid_result_from_batch(batch: BatchPair) -> Option<BatchExecutionResult> {
let results = batch
.batch()
.transactions()
.iter()
.map(|txn| {
TransactionExecutionResult::Valid(TransactionReceipt {
state_changes: vec![],
events: vec![],
data: vec![],
transaction_id: txn.header_signature().into(),
})
})
.collect();
Some(BatchExecutionResult { batch, results })
}
pub fn invalid_result_from_batch(batch: BatchPair) -> Option<BatchExecutionResult> {
let results = batch
.batch()
.transactions()
.iter()
.map(|txn| {
TransactionExecutionResult::Invalid(InvalidTransactionResult {
transaction_id: txn.header_signature().into(),
error_message: String::new(),
error_data: vec![],
})
})
.collect();
Some(BatchExecutionResult { batch, results })
}
pub fn mock_context_id() -> ContextId {
[
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e,
0x0f, 0x10,
]
}
#[derive(Clone)]
pub struct MockContextLifecycle {}
impl MockContextLifecycle {
pub fn new() -> Self {
MockContextLifecycle {}
}
}
impl ContextLifecycle for MockContextLifecycle {
fn create_context(
&mut self,
_dependent_contexts: &[ContextId],
_state_id: &str,
) -> ContextId {
[
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x01,
]
}
fn get_transaction_receipt(
&self,
_context_id: &ContextId,
transaction_id: &str,
) -> Result<TransactionReceipt, ContextManagerError> {
TransactionReceiptBuilder::new()
.with_transaction_id(transaction_id.into())
.build()
.map_err(|err| ContextManagerError::from(err))
}
fn drop_context(&mut self, _context_id: ContextId) {}
}
pub fn test_scheduler_add_batch(scheduler: &mut Scheduler) -> BatchPair {
let batch = mock_batch_with_num_txns(1);
scheduler
.add_batch(batch.clone())
.expect("Failed to add batch");
match scheduler.add_batch(batch.clone()) {
Err(SchedulerError::DuplicateBatch(batch_id)) => {
assert_eq!(batch_id, batch.batch().header_signature())
}
res => panic!("Did not get DuplicateBatch; got {:?}", res),
}
batch
}
pub fn test_scheduler_cancel(scheduler: &mut Scheduler) {
let batches = mock_batches_with_one_transaction(2);
scheduler
.add_batch(batches[0].clone())
.expect("Failed to add 1st batch");
scheduler
.add_batch(batches[1].clone())
.expect("Failed to add 2nd batch");
for batch in scheduler.cancel().expect("Failed 1st cancel") {
assert!(batches.contains(&batch));
}
assert!(scheduler.cancel().expect("Failed 2nd cancel").is_empty());
}
pub fn test_scheduler_finalize(scheduler: &mut Scheduler) {
let (tx, rx) = mpsc::channel();
scheduler
.set_result_callback(Box::new(move |result| {
tx.send(result).expect("Failed to send result");
}))
.expect("Failed to set result callback");
let mut task_iterator = scheduler
.take_task_iterator()
.expect("Failed to get task iterator");
scheduler.finalize().expect("Failed to finalize");
assert!(rx.recv().expect("Failed to receive result").is_none());
match scheduler.add_batch(mock_batch_with_num_txns(1)) {
Err(SchedulerError::SchedulerFinalized) => (),
res => panic!("Did not get SchedulerFinalized; got {:?}", res),
}
assert!(task_iterator.next().is_none());
}
pub fn test_scheduler_flow_with_one_transaction(scheduler: &mut Scheduler) {
let (tx, rx) = mpsc::channel();
scheduler
.set_result_callback(Box::new(move |result| {
tx.send(result).expect("Failed to send result");
}))
.expect("Failed to set result callback");
let batch = mock_batch_with_num_txns(1);
scheduler
.add_batch(batch.clone())
.expect("Failed to add batch");
let mut task_iterator = scheduler
.take_task_iterator()
.expect("Failed to get task iterator");
let notifier = scheduler
.new_notifier()
.expect("Failed to get new notifier");
notifier.notify(ExecutionTaskCompletionNotification::Invalid(
mock_context_id(),
InvalidTransactionResult {
transaction_id: task_iterator
.next()
.expect("Failed to get task")
.pair()
.transaction()
.header_signature()
.into(),
error_message: String::new(),
error_data: vec![],
},
));
let result = rx.recv().expect("Failed to receive result");
assert_eq!(result, invalid_result_from_batch(batch));
}
pub fn test_scheduler_flow_with_multiple_transactions(scheduler: &mut Scheduler) {
let (tx, rx) = mpsc::channel();
scheduler
.set_result_callback(Box::new(move |result| {
tx.send(result).expect("Failed to send result");
}))
.expect("Failed to set result callback");
let original_batch = mock_batch_with_num_txns(3);
scheduler
.add_batch(original_batch.clone())
.expect("Failed to add batch");
let mut task_iterator = scheduler
.take_task_iterator()
.expect("Failed to get task iterator");
let notifier = scheduler
.new_notifier()
.expect("Failed to get new notifier");
notifier.notify(ExecutionTaskCompletionNotification::Valid(
mock_context_id(),
task_iterator
.next()
.expect("Failed to get task")
.pair()
.transaction()
.header_signature()
.into(),
));
notifier.notify(ExecutionTaskCompletionNotification::Valid(
mock_context_id(),
task_iterator
.next()
.expect("Failed to get task")
.pair()
.transaction()
.header_signature()
.into(),
));
notifier.notify(ExecutionTaskCompletionNotification::Valid(
mock_context_id(),
task_iterator
.next()
.expect("Failed to get task")
.pair()
.transaction()
.header_signature()
.into(),
));
let BatchExecutionResult { batch, results } = rx
.recv()
.expect("Failed to receive result")
.expect("Got None result");
assert_eq!(batch, original_batch);
let original_batch_txn_ids = original_batch
.batch()
.transactions()
.iter()
.map(|txn| txn.header_signature())
.collect::<Vec<_>>()
.sort_unstable();
let result_txn_ids = results
.iter()
.map(|result| match result {
TransactionExecutionResult::Valid(receipt) => &receipt.transaction_id,
res => panic!("Did not get valid result; got {:?}", res),
})
.collect::<Vec<_>>()
.sort_unstable();
assert_eq!(original_batch_txn_ids, result_txn_ids);
}
pub fn test_scheduler_invalid_transaction_invalidates_batch(scheduler: &mut Scheduler) {
let (tx, rx) = mpsc::channel();
scheduler
.set_result_callback(Box::new(move |result| {
tx.send(result).expect("Failed to send result");
}))
.expect("Failed to set result callback");
let original_batch = mock_batch_with_num_txns(3);
scheduler
.add_batch(original_batch.clone())
.expect("Failed to add batch");
let mut task_iterator = scheduler
.take_task_iterator()
.expect("Failed to get task iterator");
let notifier = scheduler
.new_notifier()
.expect("Failed to get new notifier");
notifier.notify(ExecutionTaskCompletionNotification::Valid(
mock_context_id(),
task_iterator
.next()
.expect("Failed to get task")
.pair()
.transaction()
.header_signature()
.into(),
));
notifier.notify(ExecutionTaskCompletionNotification::Invalid(
mock_context_id(),
InvalidTransactionResult {
transaction_id: task_iterator
.next()
.expect("Failed to get task")
.pair()
.transaction()
.header_signature()
.into(),
error_message: String::new(),
error_data: vec![],
},
));
let BatchExecutionResult { batch, results } = rx
.recv()
.expect("Failed to receive result")
.expect("Got None result");
assert_eq!(batch, original_batch);
let original_batch_txn_ids = original_batch
.batch()
.transactions()
.iter()
.map(|txn| txn.header_signature())
.collect::<Vec<_>>()
.sort_unstable();
let result_txn_ids = results
.iter()
.map(|result| match result {
TransactionExecutionResult::Invalid(invalid_res) => &invalid_res.transaction_id,
res => panic!("Did not get invalid result; got {:?}", res),
})
.collect::<Vec<_>>()
.sort_unstable();
assert_eq!(original_batch_txn_ids, result_txn_ids);
}
pub fn test_scheduler_unexpected_notification(scheduler: &mut Scheduler) {
let (tx, rx) = mpsc::channel();
scheduler
.set_error_callback(Box::new(move |err| {
tx.send(err).expect("Failed to send error");
}))
.expect("Failed to set error callback");
let txn_id = "mock-id".to_string();
let notifier = scheduler
.new_notifier()
.expect("Failed to get new notifier");
notifier.notify(ExecutionTaskCompletionNotification::Valid(
mock_context_id(),
txn_id.clone(),
));
match rx.recv().expect("Failed to receive result") {
SchedulerError::UnexpectedNotification(unexpected_id) => {
assert_eq!(unexpected_id, txn_id)
}
err => panic!("Received unexpected error: {}", err),
}
}
}