use std::collections::{BTreeMap, HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::JoinHandle;
use log::warn;
use crate::transact::execution::adapter::{ExecutionAdapter, ExecutionAdapterError};
use crate::transact::execution::{ExecutionRegistry, TransactionFamily};
use crate::transact::scheduler::{ExecutionTask, ExecutionTaskCompletionNotifier};
use super::reader::ExecutionTaskReader;
pub type ExecutionEvent = (Box<dyn ExecutionTaskCompletionNotifier>, ExecutionTask);
pub enum ExecutionCommand {
Event(Box<ExecutionEvent>),
Sentinel,
}
pub enum RegistrationChange {
UnregisterRequest((TransactionFamily, NamedExecutionEventSender)),
RegisterRequest((TransactionFamily, NamedExecutionEventSender)),
}
pub enum ExecutorCommand {
RegistrationChange(RegistrationChange),
Execution(Box<ExecutionEvent>),
CreateReader(
Box<dyn Iterator<Item = ExecutionTask> + Send>,
Box<dyn ExecutionTaskCompletionNotifier>,
),
ReaderDone(usize),
Shutdown,
}
pub type ExecutorCommandSender = Sender<ExecutorCommand>;
pub type ExecutorCommandReceiver = Receiver<ExecutorCommand>;
pub type ExecutionEventSender = Sender<ExecutionCommand>;
pub type ExecutionEventReceiver = Receiver<ExecutionCommand>;
pub type ParkedExecutionEvents = Vec<ExecutionEvent>;
pub type ParkedExecutionEventsMap = HashMap<TransactionFamily, ParkedExecutionEvents>;
#[derive(Clone)]
pub struct NamedExecutionEventSender {
pub sender: ExecutionEventSender,
name: usize,
}
impl NamedExecutionEventSender {
pub fn new(sender: ExecutionEventSender, name: usize) -> Self {
NamedExecutionEventSender { sender, name }
}
}
impl Hash for NamedExecutionEventSender {
fn hash<H: Hasher>(&self, hasher: &mut H) {
self.name.hash(hasher)
}
}
impl PartialEq for NamedExecutionEventSender {
fn eq(&self, other: &Self) -> bool {
self.name.eq(&other.name)
}
}
impl Eq for NamedExecutionEventSender {}
#[derive(Debug)]
pub enum ExecutorThreadError {
InvalidState,
ResourcesUnavailable,
}
impl std::error::Error for ExecutorThreadError {}
impl std::fmt::Display for ExecutorThreadError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match *self {
ExecutorThreadError::InvalidState => write!(
f,
"Invalid State: ExecutorThread in an invalid state when 'start' was called"
),
ExecutorThreadError::ResourcesUnavailable => {
write!(f, "ResourcesUnavailable: ExecutorThread unable to access a resource needed for operation")
}
}
}
}
pub struct ExecutorThread {
execution_adapters: Vec<Box<dyn ExecutionAdapter>>,
join_handles: Vec<JoinHandle<()>>,
internal_thread: Option<JoinHandle<()>>,
sender: Option<ExecutorCommandSender>,
}
impl ExecutorThread {
pub fn new(execution_adapters: Vec<Box<dyn ExecutionAdapter>>) -> Self {
ExecutorThread {
execution_adapters,
join_handles: vec![],
internal_thread: None,
sender: None,
}
}
pub fn sender(&self) -> Option<ExecutorCommandSender> {
self.sender.as_ref().cloned()
}
pub fn start(&mut self) -> Result<(), ExecutorThreadError> {
if self.sender.is_none() {
let (registry_sender, receiver) = channel();
for (index, mut execution_adapter) in self.execution_adapters.drain(0..).enumerate() {
let (sender, adapter_receiver) = channel();
let ee_sender = NamedExecutionEventSender::new(sender, index);
if let Err(err) = execution_adapter.start(Box::new(InternalRegistry {
event_sender: ee_sender,
registry_sender: registry_sender.clone(),
})) {
warn!("Unable to start execution adapter: {}", err);
return Err(ExecutorThreadError::ResourcesUnavailable);
}
match Self::start_execution_adapter_thread(
execution_adapter,
adapter_receiver,
®istry_sender,
index,
) {
Ok(join_handle) => {
self.join_handles.push(join_handle);
}
Err(err) => {
warn!("Unable to start thread for execution adapter: {}", err);
return Err(ExecutorThreadError::ResourcesUnavailable);
}
}
}
self.sender = Some(registry_sender.clone());
match self.start_thread(registry_sender, receiver) {
Ok(join_handle) => {
self.internal_thread = Some(join_handle);
}
Err(err) => {
warn!("unable to start internal executor thread: {}", err);
return Err(ExecutorThreadError::ResourcesUnavailable);
}
}
Ok(())
} else {
Err(ExecutorThreadError::InvalidState)
}
}
pub fn stop(mut self) {
if let Some(sender) = self.sender.take() {
if let Err(err) = sender.send(ExecutorCommand::Shutdown) {
warn!("Unable to send shutdown signal to executor thread: {}", err);
}
if let Some(internal) = self.internal_thread.take() {
if let Err(err) = internal.join() {
warn!("During stop of executor thread: {:?}", err);
}
}
}
}
fn start_execution_adapter_thread(
execution_adapter: Box<dyn ExecutionAdapter>,
receiver: ExecutionEventReceiver,
sender: &ExecutorCommandSender,
index: usize,
) -> Result<JoinHandle<()>, std::io::Error> {
let sender = sender.clone();
std::thread::Builder::new()
.name(format!("execution_adapter_thread_{}", index))
.spawn(move || {
while let Ok(execution_command) = receiver.recv() {
match execution_command {
ExecutionCommand::Event(execution_event) => {
let sender = sender.clone();
let (completion_notifier, task) = *execution_event;
let (pair, context_id) = task.take();
let callback = Box::new(move |result| {
let completion_notifier = completion_notifier.clone();
match result {
Ok(tp_processing_result) => {
completion_notifier.notify(tp_processing_result);
}
Err(ExecutionAdapterError::TimeoutError(transaction_pair)) => {
let execution_task =
ExecutionTask::new(*transaction_pair, context_id);
let execution_event = (completion_notifier, execution_task);
if let Err(err) = sender.send(ExecutorCommand::Execution(
Box::new(execution_event),
)) {
warn!("During retry of TimeOutError: {}", err);
}
}
Err(ExecutionAdapterError::RoutingError(transaction_pair)) => {
let execution_task =
ExecutionTask::new(*transaction_pair, context_id);
let execution_event = (completion_notifier, execution_task);
if let Err(err) = sender.send(ExecutorCommand::Execution(
Box::new(execution_event),
)) {
warn!("During retry of RoutingError: {}", err);
}
}
Err(ExecutionAdapterError::GeneralExecutionError(err)) => {
error!("General Execution Error: {}", err);
}
}
});
if let Err(err) = execution_adapter.execute(pair, context_id, callback)
{
error!("Unable to execute on adapter {}: {}", index, err);
break;
}
}
ExecutionCommand::Sentinel => {
break;
}
}
}
if let Err(err) = execution_adapter.stop() {
error!("Unable to cleanly stop adapter {}: {}", index, err);
}
})
}
fn start_thread(
&self,
sender: ExecutorCommandSender,
receiver: ExecutorCommandReceiver,
) -> Result<JoinHandle<()>, std::io::Error> {
std::thread::Builder::new()
.name("internal_executor_thread".to_string())
.spawn(move || {
let mut fanout_threads: HashMap<
TransactionFamily,
HashSet<NamedExecutionEventSender>,
> = HashMap::new();
let mut parked: ParkedExecutionEventsMap = HashMap::new();
let mut unparked = vec![];
let mut reader_index = 0usize;
let mut readers: BTreeMap<usize, ExecutionTaskReader> = BTreeMap::new();
loop {
for execution_event in unparked.drain(0..) {
Self::try_send_execution_event(
Box::new(execution_event),
&fanout_threads,
&mut parked,
);
}
match receiver.recv() {
Ok(ExecutorCommand::Execution(execution_event)) => {
Self::try_send_execution_event(
execution_event,
&fanout_threads,
&mut parked,
)
}
Ok(ExecutorCommand::RegistrationChange(
RegistrationChange::RegisterRequest((transaction_family, sender)),
)) => {
if let Some(p) = parked.get_mut(&transaction_family) {
unparked.append(p);
}
let found = if let Some(ea_senders) =
fanout_threads.get_mut(&transaction_family)
{
ea_senders.insert(sender);
None
} else {
#[allow(clippy::mutable_key_type)]
let mut s = HashSet::new();
s.insert(sender);
Some(s)
};
if let Some(f) = found {
fanout_threads.insert(transaction_family, f);
}
}
Ok(ExecutorCommand::RegistrationChange(
RegistrationChange::UnregisterRequest((transaction_family, sender)),
)) => {
fanout_threads
.entry(transaction_family)
.and_modify(|ea_senders| {
ea_senders.remove(&sender);
});
}
Ok(ExecutorCommand::CreateReader(task_iterator, notifier)) => {
let (index, _) = reader_index.overflowing_add(1);
let mut reader = ExecutionTaskReader::new(index);
if let Err(err) = reader.start(task_iterator, notifier, sender.clone())
{
error!("Unable to start task reader: {}", err);
continue;
}
readers.insert(index, reader);
reader_index = index;
}
Ok(ExecutorCommand::ReaderDone(reader_id)) => {
if let Some(reader) = readers.remove(&reader_id) {
reader.stop();
}
}
Ok(ExecutorCommand::Shutdown) => {
break;
}
Err(err) => {
error!("Received error while processing executor commands: {}", err);
break;
}
}
}
Self::shutdown_fanout_threads(&fanout_threads);
for (_, reader) in readers.into_iter() {
reader.stop();
}
})
}
fn shutdown_fanout_threads(
fanout_threads: &HashMap<TransactionFamily, HashSet<NamedExecutionEventSender>>,
) {
for sender in fanout_threads
.values()
.fold(HashSet::new(), |mut set, item| {
for s in item {
set.insert(s);
}
set
})
{
if let Err(err) = sender.sender.send(ExecutionCommand::Sentinel) {
warn!("During stop of ExecutorThread internal thread: {}", err);
}
}
}
fn try_send_execution_event(
execution_event: Box<ExecutionEvent>,
fanout_threads: &HashMap<TransactionFamily, HashSet<NamedExecutionEventSender>>,
parked: &mut ParkedExecutionEventsMap,
) {
let tf = TransactionFamily::from_pair(execution_event.1.pair());
if let Some(ea_senders) = fanout_threads.get(&tf) {
if let Some(sender) = ea_senders.iter().next() {
if let Err(err) = sender.sender.send(ExecutionCommand::Event(execution_event)) {
warn!("During send of ExecutionCommand: {}", err);
}
} else {
Self::park_execution_event(parked, *execution_event, tf);
}
} else {
Self::park_execution_event(parked, *execution_event, tf);
}
}
fn park_execution_event(
parked: &mut ParkedExecutionEventsMap,
execution_event: ExecutionEvent,
transaction_family: TransactionFamily,
) {
let p: Option<ParkedExecutionEvents> = match parked.get_mut(&transaction_family) {
Some(p) => {
p.push(execution_event);
None
}
None => Some(vec![execution_event]),
};
if let Some(p) = p {
parked.insert(transaction_family, p);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{self, collections::HashSet, sync::mpsc::channel};
use cylinder::{secp256k1::Secp256k1Context, Context, Signer};
use crate::transact::context::ContextId;
use crate::transact::execution::adapter::test_adapter::TestExecutionAdapter;
use crate::transact::protocol::transaction::{HashMethod, TransactionBuilder, TransactionPair};
use crate::transact::scheduler::ExecutionTaskCompletionNotification;
static FAMILY_NAME: &str = "test";
static FAMILY_VERSION: &str = "1.0";
static KEY1: &str = "111111111111111111111111111111111111111111111111111111111111111111";
static KEY2: &str = "222222222222222222222222222222222222222222222222222222222222222222";
static KEY3: &str = "333333333333333333333333333333333333333333333333333333333333333333";
static KEY4: &str = "444444444444444444444444444444444444444444444444444444444444444444";
static KEY5: &str = "555555555555555555555555555555555555555555555555555555555555555555";
static KEY6: &str = "666666666666666666666666666666666666666666666666666666666666666666";
static KEY7: &str = "777777777777777777777777777777777777777777777777777777777777777777";
static NONCE: &str = "f9kdzz";
static BYTES2: [u8; 4] = [0x05, 0x06, 0x07, 0x08];
static NUMBER_OF_TRANSACTIONS: usize = 20;
#[test]
fn test_executor_internal() {
let (sender, notification_receiver) = channel::<ExecutionTaskCompletionNotification>();
let notifier: Box<dyn ExecutionTaskCompletionNotifier> =
Box::new(ChannelExecutionTaskCompletionNotifier { tx: sender });
let (registration_execution_event_sender, internal_receiver): (
ExecutorCommandSender,
ExecutorCommandReceiver,
) = channel::<ExecutorCommand>();
let (execution_adapter_sender, receiver) = channel::<ExecutionCommand>();
let tf = TransactionFamily::new(FAMILY_NAME.to_string(), FAMILY_VERSION.to_string());
let named_sender = NamedExecutionEventSender::new(execution_adapter_sender, 0);
let registration_event = ExecutorCommand::RegistrationChange(
RegistrationChange::RegisterRequest((tf, named_sender)),
);
registration_execution_event_sender
.send(registration_event)
.expect("The receiver is dropped");
let execution_tasks = create_iterator();
for reg_ex_event in execution_tasks
.map(|execution_task| (notifier.clone(), execution_task))
.map(|execution_event| ExecutorCommand::Execution(Box::new(execution_event)))
{
registration_execution_event_sender
.send(reg_ex_event)
.expect("Receiver has been dropped");
}
let mut parked_transaction_map: ParkedExecutionEventsMap = HashMap::new();
let mut unparked_transactions: Vec<ExecutionEvent> = vec![];
let mut named_senders: HashMap<TransactionFamily, HashSet<NamedExecutionEventSender>> =
HashMap::new();
while let Ok(event) = internal_receiver.try_recv() {
match event {
ExecutorCommand::Execution(execution_event) => {
let (_, execution_state) = execution_event.as_ref();
let tf = TransactionFamily::from_pair(execution_state.pair());
match named_senders.get(&tf) {
Some(senders) => match senders.iter().nth(0) {
Some(sender) => {
sender
.sender
.send(ExecutionCommand::Event(execution_event))
.expect("The receiver has been dropped");
}
None => {
let parked = match parked_transaction_map.get_mut(&tf) {
Some(parked) => {
parked.push(*execution_event);
None
}
None => Some(vec![*execution_event]),
};
if let Some(p) = parked {
parked_transaction_map.insert(tf, p);
}
}
},
None => {
let parked = match parked_transaction_map.get_mut(&tf) {
Some(parked) => {
parked.push(*execution_event);
None
}
None => Some(vec![*execution_event]),
};
if let Some(p) = parked {
parked_transaction_map.insert(tf, p);
}
}
}
}
ExecutorCommand::RegistrationChange(registration_event) => match registration_event
{
RegistrationChange::RegisterRequest((tf, sender)) => {
parked_transaction_map
.entry(tf.clone())
.and_modify(|parked| {
for p in parked.drain(0..) {
unparked_transactions.push(p);
}
});
let senders_option = match named_senders.get_mut(&tf) {
Some(senders) => {
senders.insert(sender);
None
}
None => {
let mut s = HashSet::new();
s.insert(sender);
Some(s)
}
};
if let Some(senders) = senders_option {
named_senders.insert(tf, senders);
}
}
RegistrationChange::UnregisterRequest((tf, sender)) => {
named_senders.entry(tf).and_modify(|senders| {
senders.remove(&sender);
});
}
},
_ => panic!("Should not have sent command during test"),
}
}
while let Ok(event) = receiver.try_recv() {
if let ExecutionCommand::Event(execution_event) = event {
let (notifier, task) = *execution_event;
let notification = ExecutionTaskCompletionNotification::Valid(
*task.context_id(),
task.pair().transaction().header_signature().into(),
);
notifier.notify(notification);
}
}
let mut results = vec![];
while let Ok(result) = notification_receiver.try_recv() {
results.push(result);
}
assert_eq!(
results.len(),
NUMBER_OF_TRANSACTIONS,
"Incorrect number of results received",
);
}
#[test]
fn test_executor_thread() {
let noop_adapter = TestExecutionAdapter::new();
let adapter = noop_adapter.clone();
let mut executor_thread: ExecutorThread = ExecutorThread::new(vec![Box::new(noop_adapter)]);
executor_thread
.start()
.expect("Start can only be called once");
let sender = executor_thread
.sender()
.expect("Sender is some after start is called");
let execution_tasks = create_iterator();
let (tx, receiver) = channel();
let notifier: Box<dyn ExecutionTaskCompletionNotifier> =
Box::new(ChannelExecutionTaskCompletionNotifier { tx });
for reg_ex_event in execution_tasks
.map(|execution_task| (notifier.clone(), execution_task))
.map(|execution_event| ExecutorCommand::Execution(Box::new(execution_event)))
{
sender
.send(reg_ex_event)
.expect("Receiver has been dropped");
}
assert!(
receiver.try_recv().is_err(),
"The result is not available yet"
);
adapter.register("test", "1.0");
let mut results = vec![];
for _ in 0..NUMBER_OF_TRANSACTIONS {
if let Ok(result) = receiver.recv() {
results.push(result);
}
}
assert_eq!(
results.len(),
NUMBER_OF_TRANSACTIONS,
"Incorrect number of results received",
);
executor_thread.stop();
}
fn create_txn(signer: &dyn Signer) -> TransactionPair {
TransactionBuilder::new()
.with_batcher_public_key(hex::decode(KEY1).unwrap())
.with_dependencies(vec![KEY2.to_string(), KEY3.to_string()])
.with_family_name(FAMILY_NAME.to_string())
.with_family_version(FAMILY_VERSION.to_string())
.with_inputs(vec![
hex::decode(KEY4).unwrap(),
hex::decode(&KEY5[0..4]).unwrap(),
])
.with_nonce(NONCE.to_string().into_bytes())
.with_outputs(vec![
hex::decode(KEY6).unwrap(),
hex::decode(&KEY7[0..4]).unwrap(),
])
.with_payload_hash_method(HashMethod::Sha512)
.with_payload(BYTES2.to_vec())
.build_pair(signer)
.expect("The TransactionBuilder was not given the correct items")
}
fn create_iterator() -> impl Iterator<Item = ExecutionTask> {
let context_id = ContextId::default();
let signer = new_signer();
(0..NUMBER_OF_TRANSACTIONS)
.map(move |_| create_txn(&*signer))
.map(move |txn_pair| ExecutionTask::new(txn_pair, context_id.clone()))
}
fn new_signer() -> Box<dyn Signer> {
let context = Secp256k1Context::new();
let key = context.new_random_private_key();
context.new_signer(key)
}
#[derive(Clone)]
struct ChannelExecutionTaskCompletionNotifier {
tx: Sender<ExecutionTaskCompletionNotification>,
}
impl ExecutionTaskCompletionNotifier for ChannelExecutionTaskCompletionNotifier {
fn notify(&self, notification: ExecutionTaskCompletionNotification) {
self.tx
.send(notification)
.expect("Unable to send the notification");
}
fn clone_box(&self) -> Box<dyn ExecutionTaskCompletionNotifier> {
Box::new(self.clone())
}
}
}
struct InternalRegistry {
registry_sender: ExecutorCommandSender,
event_sender: NamedExecutionEventSender,
}
impl ExecutionRegistry for InternalRegistry {
fn register_transaction_family(&mut self, family: TransactionFamily) {
if let Err(err) = self
.registry_sender
.send(ExecutorCommand::RegistrationChange(
RegistrationChange::RegisterRequest((family, self.event_sender.clone())),
))
{
warn!(
"During sending registration of transaction family on channel: {}",
err
);
}
}
fn unregister_transaction_family(&mut self, family: &TransactionFamily) {
if let Err(err) = self
.registry_sender
.send(ExecutorCommand::RegistrationChange(
RegistrationChange::UnregisterRequest((family.clone(), self.event_sender.clone())),
))
{
warn!(
"During sending unregistration of transaction family on channel: {}",
err
);
}
}
}