mod storage_wrapper;
mod vm_wrapper;
use crate::{
adapter_common::{preprocess_transaction, PreprocessedTransaction},
aptos_vm::AptosVM,
parallel_executor::vm_wrapper::AptosVMWrapper,
};
use aptos_parallel_executor::{
errors::Error,
executor::ParallelTransactionExecutor,
task::{Transaction as PTransaction, TransactionOutput as PTransactionOutput},
};
use aptos_state_view::StateView;
use aptos_types::{
state_store::state_key::StateKey,
transaction::{Transaction, TransactionOutput, TransactionStatus},
write_set::{WriteOp, WriteSet},
};
use move_deps::move_core_types::vm_status::{StatusCode, VMStatus};
use rayon::prelude::*;
impl PTransaction for PreprocessedTransaction {
type Key = StateKey;
type Value = WriteOp;
}
pub(crate) struct AptosTransactionOutput(TransactionOutput);
impl AptosTransactionOutput {
pub fn new(output: TransactionOutput) -> Self {
Self(output)
}
pub fn into(self) -> TransactionOutput {
self.0
}
}
impl PTransactionOutput for AptosTransactionOutput {
type T = PreprocessedTransaction;
fn get_writes(&self) -> Vec<(StateKey, WriteOp)> {
self.0.write_set().iter().cloned().collect()
}
fn skip_output() -> Self {
Self(TransactionOutput::new(
WriteSet::default(),
vec![],
0,
TransactionStatus::Retry,
))
}
}
pub struct ParallelAptosVM();
impl ParallelAptosVM {
pub fn execute_block<S: StateView>(
transactions: Vec<Transaction>,
state_view: &S,
concurrency_level: usize,
) -> Result<(Vec<TransactionOutput>, Option<Error<VMStatus>>), VMStatus> {
let signature_verified_block: Vec<PreprocessedTransaction> = transactions
.par_iter()
.map(|txn| preprocess_transaction::<AptosVM>(txn.clone()))
.collect();
match ParallelTransactionExecutor::<PreprocessedTransaction, AptosVMWrapper<S>>::new(
concurrency_level,
)
.execute_transactions_parallel(state_view, signature_verified_block)
{
Ok(results) => Ok((
results
.into_iter()
.map(AptosTransactionOutput::into)
.collect(),
None,
)),
Err(err @ Error::InferencerError) | Err(err @ Error::UnestimatedWrite) => {
let output = AptosVM::execute_block_and_keep_vm_status(transactions, state_view)?;
Ok((
output
.into_iter()
.map(|(_vm_status, txn_output)| txn_output)
.collect(),
Some(err),
))
}
Err(Error::InvariantViolation) => Err(VMStatus::Error(
StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR,
)),
Err(Error::UserError(err)) => Err(err),
}
}
}