pub mod multi;
pub mod parallel;
pub mod serial;
use std::error::Error;
use crate::context::ContextId;
use crate::protocol::batch::BatchPair;
use crate::protocol::receipt::{TransactionReceipt, TransactionResult};
use crate::protocol::transaction::TransactionPair;
pub struct ExecutionTask {
pair: TransactionPair,
context_id: ContextId,
}
impl ExecutionTask {
pub fn new(pair: TransactionPair, context_id: ContextId) -> Self {
ExecutionTask { pair, context_id }
}
pub fn pair(&self) -> &TransactionPair {
&self.pair
}
pub fn context_id(&self) -> &ContextId {
&self.context_id
}
pub fn take(self) -> (TransactionPair, ContextId) {
(self.pair, self.context_id)
}
}
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
pub struct BatchExecutionResult {
pub batch: BatchPair,
pub receipts: Vec<TransactionReceipt>,
}
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
pub struct InvalidTransactionResult {
pub transaction_id: String,
pub error_message: String,
pub error_data: Vec<u8>,
}
impl Into<TransactionReceipt> for InvalidTransactionResult {
fn into(self) -> TransactionReceipt {
TransactionReceipt {
transaction_id: self.transaction_id,
transaction_result: TransactionResult::Invalid {
error_message: self.error_message,
error_data: self.error_data,
},
}
}
}
#[derive(Debug, PartialEq)]
pub enum ExecutionTaskCompletionNotification {
Invalid(ContextId, InvalidTransactionResult),
Valid(ContextId, String),
}
#[derive(Clone, Debug)]
pub enum SchedulerError {
DuplicateBatch(String),
Internal(String),
NoTaskIterator,
SchedulerFinalized,
UnexpectedNotification(String),
}
impl Error for SchedulerError {}
impl std::fmt::Display for SchedulerError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match *self {
SchedulerError::DuplicateBatch(ref batch_id) => {
write!(f, "duplicate batch added to scheduler: {}", batch_id)
}
SchedulerError::Internal(ref err) => {
write!(f, "scheduler encountered an internal error: {}", err)
}
SchedulerError::NoTaskIterator => write!(f, "task iterator already taken"),
SchedulerError::SchedulerFinalized => write!(f, "batch added to finalized scheduler"),
SchedulerError::UnexpectedNotification(ref txn_id) => write!(
f,
"scheduler received an unexpected notification: {}",
txn_id
),
}
}
}
pub trait Scheduler: Send {
fn set_result_callback(
&mut self,
callback: Box<dyn Fn(Option<BatchExecutionResult>) + Send>,
) -> Result<(), SchedulerError>;
fn set_error_callback(
&mut self,
callback: Box<dyn Fn(SchedulerError) + Send>,
) -> Result<(), SchedulerError>;
fn add_batch(&mut self, batch: BatchPair) -> Result<(), SchedulerError>;
fn cancel(&mut self) -> Result<Vec<BatchPair>, SchedulerError>;
fn finalize(&mut self) -> Result<(), SchedulerError>;
fn take_task_iterator(
&mut self,
) -> Result<Box<dyn Iterator<Item = ExecutionTask> + Send>, SchedulerError>;
fn new_notifier(&mut self) -> Result<Box<dyn ExecutionTaskCompletionNotifier>, SchedulerError>;
}
pub trait SchedulerFactory: Send {
fn create_scheduler(&self, state_id: String) -> Result<Box<dyn Scheduler>, SchedulerError>;
}
pub trait ExecutionTaskCompletionNotifier: Send {
fn notify(&self, notification: ExecutionTaskCompletionNotification);
fn clone_box(&self) -> Box<dyn ExecutionTaskCompletionNotifier>;
}
impl Clone for Box<dyn ExecutionTaskCompletionNotifier> {
fn clone(&self) -> Self {
self.clone_box()
}
}
fn default_result_callback(batch_result: Option<BatchExecutionResult>) {
warn!(
"No result callback set; dropping batch execution result: {}",
match batch_result {
Some(ref result) => result.batch.batch().header_signature(),
None => "None",
}
);
}
fn default_error_callback(error: SchedulerError) {
error!("No error callback set; SchedulerError: {}", error);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::context::manager::ContextManagerError;
use crate::context::ContextLifecycle;
use crate::protocol::batch::BatchBuilder;
use crate::protocol::receipt::TransactionReceiptBuilder;
use crate::protocol::transaction::{HashMethod, Transaction, TransactionBuilder};
use std::sync::mpsc;
use cylinder::{secp256k1::Secp256k1Context, Context, Signer};
pub fn mock_transactions(num: u8) -> Vec<Transaction> {
let signer = new_signer();
(0..num)
.map(|i| {
TransactionBuilder::new()
.with_family_name("mock".into())
.with_family_version("0.1".into())
.with_inputs(vec![])
.with_outputs(vec![])
.with_nonce(vec![i])
.with_payload(vec![])
.with_payload_hash_method(HashMethod::SHA512)
.build(&*signer)
.expect("Failed to build transaction")
})
.collect()
}
pub fn mock_batch(transactions: Vec<Transaction>) -> BatchPair {
let signer = new_signer();
BatchBuilder::new()
.with_transactions(transactions)
.build_pair(&*signer)
.expect("Failed to build batch pair")
}
pub fn mock_batch_with_num_txns(num: u8) -> BatchPair {
mock_batch(mock_transactions(num))
}
pub fn mock_batches_with_one_transaction(num_batches: u8) -> Vec<BatchPair> {
mock_transactions(num_batches)
.into_iter()
.map(|txn| mock_batch(vec![txn]))
.collect()
}
fn new_signer() -> Box<dyn Signer> {
let context = Secp256k1Context::new();
let key = context.new_random_private_key();
context.new_signer(key)
}
pub fn valid_receipt_from_batch(batch: BatchPair) -> Option<BatchExecutionResult> {
let receipts = batch
.batch()
.transactions()
.iter()
.map(|txn| TransactionReceipt {
transaction_id: txn.header_signature().into(),
transaction_result: TransactionResult::Valid {
state_changes: vec![],
events: vec![],
data: vec![],
},
})
.collect();
Some(BatchExecutionResult { batch, receipts })
}
pub fn invalid_receipt_from_batch(batch: BatchPair) -> Option<BatchExecutionResult> {
let receipts = batch
.batch()
.transactions()
.iter()
.map(|txn| TransactionReceipt {
transaction_id: txn.header_signature().into(),
transaction_result: TransactionResult::Invalid {
error_message: String::new(),
error_data: vec![],
},
})
.collect();
Some(BatchExecutionResult { batch, receipts })
}
pub fn mock_context_id() -> ContextId {
[
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e,
0x0f, 0x10,
]
}
#[derive(Clone)]
pub struct MockContextLifecycle {}
impl MockContextLifecycle {
pub fn new() -> Self {
MockContextLifecycle {}
}
}
impl ContextLifecycle for MockContextLifecycle {
fn create_context(
&mut self,
_dependent_contexts: &[ContextId],
_state_id: &str,
) -> ContextId {
[
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x01,
]
}
fn get_transaction_receipt(
&self,
_context_id: &ContextId,
transaction_id: &str,
) -> Result<TransactionReceipt, ContextManagerError> {
TransactionReceiptBuilder::new()
.valid()
.with_transaction_id(transaction_id.into())
.build()
.map_err(|err| ContextManagerError::from(err))
}
fn drop_context(&mut self, _context_id: ContextId) {}
fn clone_box(&self) -> Box<dyn ContextLifecycle> {
Box::new(self.clone())
}
}
pub fn test_scheduler_add_batch(scheduler: &mut dyn Scheduler) -> BatchPair {
let batch = mock_batch_with_num_txns(1);
scheduler
.add_batch(batch.clone())
.expect("Failed to add batch");
match scheduler.add_batch(batch.clone()) {
Err(SchedulerError::DuplicateBatch(batch_id)) => {
assert_eq!(batch_id, batch.batch().header_signature())
}
res => panic!("Did not get DuplicateBatch; got {:?}", res),
}
batch
}
pub fn test_scheduler_cancel(scheduler: &mut dyn Scheduler) {
let batches = mock_batches_with_one_transaction(2);
scheduler
.add_batch(batches[0].clone())
.expect("Failed to add 1st batch");
scheduler
.add_batch(batches[1].clone())
.expect("Failed to add 2nd batch");
for batch in scheduler.cancel().expect("Failed 1st cancel") {
assert!(batches.contains(&batch));
}
assert!(scheduler.cancel().expect("Failed 2nd cancel").is_empty());
}
pub fn test_scheduler_finalize(scheduler: &mut dyn Scheduler) {
let (tx, rx) = mpsc::channel();
scheduler
.set_result_callback(Box::new(move |result| {
tx.send(result).expect("Failed to send result");
}))
.expect("Failed to set result callback");
let mut task_iterator = scheduler
.take_task_iterator()
.expect("Failed to get task iterator");
scheduler.finalize().expect("Failed to finalize");
assert!(rx.recv().expect("Failed to receive result").is_none());
match scheduler.add_batch(mock_batch_with_num_txns(1)) {
Err(SchedulerError::SchedulerFinalized) => (),
res => panic!("Did not get SchedulerFinalized; got {:?}", res),
}
assert!(task_iterator.next().is_none());
}
pub fn test_scheduler_flow_with_one_transaction(scheduler: &mut dyn Scheduler) {
let (tx, rx) = mpsc::channel();
scheduler
.set_result_callback(Box::new(move |result| {
if result.is_some() {
tx.send(result).expect("Failed to send result");
}
}))
.expect("Failed to set result callback");
let batch = mock_batch_with_num_txns(1);
scheduler
.add_batch(batch.clone())
.expect("Failed to add batch");
let mut task_iterator = scheduler
.take_task_iterator()
.expect("Failed to get task iterator");
let notifier = scheduler
.new_notifier()
.expect("Failed to get new notifier");
notifier.notify(ExecutionTaskCompletionNotification::Invalid(
mock_context_id(),
InvalidTransactionResult {
transaction_id: task_iterator
.next()
.expect("Failed to get task")
.pair()
.transaction()
.header_signature()
.into(),
error_message: String::new(),
error_data: vec![],
},
));
let result = rx.recv().expect("Failed to receive result");
assert_eq!(result, invalid_receipt_from_batch(batch));
}
pub fn test_scheduler_flow_with_two_batches(scheduler: &mut dyn Scheduler) {
let (tx, rx) = mpsc::channel();
scheduler
.set_result_callback(Box::new(move |result| {
tx.send(result).expect("Failed to send result");
}))
.expect("Failed to set result callback");
let batches = mock_batches_with_one_transaction(2);
for batch in batches.iter() {
scheduler
.add_batch(batch.clone())
.expect("Failed to add batch");
}
scheduler.finalize().expect("Unable to finalize scheduler");
let mut task_iterator = scheduler
.take_task_iterator()
.expect("Failed to get task iterator");
let notifier = scheduler
.new_notifier()
.expect("Failed to get new notifier");
notifier.notify(ExecutionTaskCompletionNotification::Invalid(
mock_context_id(),
InvalidTransactionResult {
transaction_id: task_iterator
.next()
.expect("Failed to get task")
.pair()
.transaction()
.header_signature()
.into(),
error_message: String::new(),
error_data: vec![],
},
));
notifier.notify(ExecutionTaskCompletionNotification::Invalid(
mock_context_id(),
InvalidTransactionResult {
transaction_id: task_iterator
.next()
.expect("Failed to get task")
.pair()
.transaction()
.header_signature()
.into(),
error_message: String::new(),
error_data: vec![],
},
));
for batch in batches {
let result = rx.recv().expect("Failed to receive result");
assert_eq!(result, invalid_receipt_from_batch(batch));
}
let result = rx.recv().expect("Failed to receive result");
assert_eq!(result, None);
}
pub fn test_scheduler_flow_with_multiple_transactions(scheduler: &mut dyn Scheduler) {
let (tx, rx) = mpsc::channel();
scheduler
.set_result_callback(Box::new(move |result| {
if result.is_some() {
tx.send(result).expect("Failed to send result");
}
}))
.expect("Failed to set result callback");
let original_batch = mock_batch_with_num_txns(3);
scheduler
.add_batch(original_batch.clone())
.expect("Failed to add batch");
let mut task_iterator = scheduler
.take_task_iterator()
.expect("Failed to get task iterator");
let notifier = scheduler
.new_notifier()
.expect("Failed to get new notifier");
notifier.notify(ExecutionTaskCompletionNotification::Valid(
mock_context_id(),
task_iterator
.next()
.expect("Failed to get task")
.pair()
.transaction()
.header_signature()
.into(),
));
notifier.notify(ExecutionTaskCompletionNotification::Valid(
mock_context_id(),
task_iterator
.next()
.expect("Failed to get task")
.pair()
.transaction()
.header_signature()
.into(),
));
notifier.notify(ExecutionTaskCompletionNotification::Valid(
mock_context_id(),
task_iterator
.next()
.expect("Failed to get task")
.pair()
.transaction()
.header_signature()
.into(),
));
let BatchExecutionResult { batch, receipts } = rx
.recv()
.expect("Failed to receive result")
.expect("Got None result");
assert_eq!(batch, original_batch);
let original_batch_txn_ids = original_batch
.batch()
.transactions()
.iter()
.map(|txn| txn.header_signature())
.collect::<Vec<_>>()
.sort_unstable();
let receipt_txn_ids = receipts
.iter()
.map(|receipt| match &receipt.transaction_result {
TransactionResult::Valid { .. } => &receipt.transaction_id,
res => panic!("Did not get valid receipt; got {:?}", res),
})
.collect::<Vec<_>>()
.sort_unstable();
assert_eq!(original_batch_txn_ids, receipt_txn_ids);
}
pub fn test_scheduler_invalid_transaction_invalidates_batch(scheduler: &mut dyn Scheduler) {
let (tx, rx) = mpsc::channel();
scheduler
.set_result_callback(Box::new(move |result| {
if result.is_some() {
tx.send(result).expect("Failed to send result");
}
}))
.expect("Failed to set result callback");
let original_batch = mock_batch_with_num_txns(3);
scheduler
.add_batch(original_batch.clone())
.expect("Failed to add batch");
let mut task_iterator = scheduler
.take_task_iterator()
.expect("Failed to get task iterator");
let notifier = scheduler
.new_notifier()
.expect("Failed to get new notifier");
notifier.notify(ExecutionTaskCompletionNotification::Valid(
mock_context_id(),
task_iterator
.next()
.expect("Failed to get task")
.pair()
.transaction()
.header_signature()
.into(),
));
notifier.notify(ExecutionTaskCompletionNotification::Invalid(
mock_context_id(),
InvalidTransactionResult {
transaction_id: task_iterator
.next()
.expect("Failed to get task")
.pair()
.transaction()
.header_signature()
.into(),
error_message: String::new(),
error_data: vec![],
},
));
let BatchExecutionResult { batch, receipts } = rx
.recv()
.expect("Failed to receive result")
.expect("Got None result");
assert_eq!(batch, original_batch);
let original_batch_txn_ids = original_batch
.batch()
.transactions()
.iter()
.map(|txn| txn.header_signature())
.collect::<Vec<_>>()
.sort_unstable();
let receipt_txn_ids = receipts
.iter()
.map(|receipt| match &receipt.transaction_result {
TransactionResult::Invalid { .. } => &receipt.transaction_id,
res => panic!("Did not get invalid receipt; got {:?}", res),
})
.collect::<Vec<_>>()
.sort_unstable();
assert_eq!(original_batch_txn_ids, receipt_txn_ids);
}
}