use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use miden_node_utils::ErrorReport;
use miden_node_utils::lru_cache::LruCache;
use miden_node_utils::tracing::OpenTelemetrySpanExt;
use miden_protocol::Word;
use miden_protocol::account::{
Account,
AccountId,
AccountStorageHeader,
PartialAccount,
StorageMapKey,
StorageMapWitness,
StorageSlotName,
StorageSlotType,
};
use miden_protocol::asset::{AssetVaultKey, AssetWitness};
use miden_protocol::block::{BlockHeader, BlockNumber};
use miden_protocol::errors::TransactionInputError;
use miden_protocol::note::{Note, NoteScript};
use miden_protocol::transaction::{
AccountInputs,
ExecutedTransaction,
InputNote,
InputNotes,
PartialBlockchain,
ProvenTransaction,
TransactionArgs,
TransactionId,
TransactionInputs,
};
use miden_protocol::vm::FutureMaybeSend;
use miden_remote_prover_client::RemoteTransactionProver;
use miden_tx::auth::UnreachableAuth;
use miden_tx::{
DataStore,
DataStoreError,
ExecutionOptions,
FailedNote,
LocalTransactionProver,
MastForestStore,
NoteCheckerError,
NoteConsumptionChecker,
NoteConsumptionInfo,
TransactionExecutor,
TransactionExecutorError,
TransactionMastStore,
TransactionProverError,
};
use tokio::sync::Mutex;
use tracing::{Instrument, instrument};
use crate::COMPONENT;
use crate::actor::candidate::TransactionCandidate;
use crate::clients::{BlockProducerClient, StoreClient, ValidatorClient};
use crate::db::Db;
#[derive(Debug, thiserror::Error)]
pub enum NtxError {
#[error("note inputs were invalid")]
InputNotes(#[source] TransactionInputError),
#[error("failed to filter notes")]
NoteFilter(#[source] NoteCheckerError),
#[error("all notes failed to be executed")]
AllNotesFailed(Vec<FailedNote>),
#[error("failed to execute transaction")]
Execution(#[source] TransactionExecutorError),
#[error("failed to prove transaction")]
Proving(#[source] TransactionProverError),
#[error("failed to submit transaction")]
Submission(#[source] tonic::Status),
}
type NtxResult<T> = Result<T, NtxError>;
pub type NtxExecutionResult = (TransactionId, Vec<FailedNote>, Vec<(Word, NoteScript)>);
#[derive(Clone)]
pub struct NtxContext {
block_producer: BlockProducerClient,
validator: ValidatorClient,
prover: Option<RemoteTransactionProver>,
store: StoreClient,
script_cache: LruCache<Word, NoteScript>,
db: Db,
max_cycles: u32,
}
impl NtxContext {
pub fn new(
block_producer: BlockProducerClient,
validator: ValidatorClient,
prover: Option<RemoteTransactionProver>,
store: StoreClient,
script_cache: LruCache<Word, NoteScript>,
db: Db,
max_cycles: u32,
) -> Self {
Self {
block_producer,
validator,
prover,
store,
script_cache,
db,
max_cycles,
}
}
fn create_executor<'a, 'b>(
&self,
data_store: &'a NtxDataStore,
) -> TransactionExecutor<'a, 'b, NtxDataStore, UnreachableAuth> {
let exec_options = ExecutionOptions::new(
Some(self.max_cycles),
self.max_cycles,
ExecutionOptions::DEFAULT_CORE_TRACE_FRAGMENT_SIZE,
false,
false,
)
.expect("max_cycles should be within valid range");
TransactionExecutor::new(data_store)
.with_options(exec_options)
.expect("execution options should be valid for transaction executor")
}
#[instrument(target = COMPONENT, name = "ntx.execute_transaction", skip_all, err)]
pub fn execute_transaction(
self,
tx: TransactionCandidate,
) -> impl FutureMaybeSend<NtxResult<NtxExecutionResult>> {
let TransactionCandidate {
account,
notes,
chain_tip_header,
chain_mmr,
} = tx;
tracing::Span::current().set_attribute("account.id", account.id());
tracing::Span::current()
.set_attribute("account.id.network_prefix", account.id().prefix().to_string().as_str());
tracing::Span::current().set_attribute("notes.count", notes.len());
tracing::Span::current()
.set_attribute("reference_block.number", chain_tip_header.block_num());
async move {
Box::pin(async move {
let data_store = NtxDataStore::new(
account,
chain_tip_header,
chain_mmr,
self.store.clone(),
self.script_cache.clone(),
self.db.clone(),
);
let notes = notes.into_iter().map(Note::from).collect::<Vec<_>>();
let (successful_notes, failed_notes) =
self.filter_notes(&data_store, notes).await?;
let executed_tx = Box::pin(self.execute(&data_store, successful_notes)).await?;
let scripts_to_cache = data_store.take_fetched_scripts().await;
let tx_inputs: TransactionInputs = executed_tx.into();
let proven_tx = Box::pin(self.prove(&tx_inputs)).await?;
self.validate(&proven_tx, &tx_inputs).await?;
self.submit(&proven_tx).await?;
Ok((proven_tx.id(), failed_notes, scripts_to_cache))
})
.in_current_span()
.await
.inspect_err(|err| tracing::Span::current().set_error(err))
}
}
#[instrument(target = COMPONENT, name = "ntx.execute_transaction.filter_notes", skip_all, err)]
async fn filter_notes(
&self,
data_store: &NtxDataStore,
notes: Vec<Note>,
) -> NtxResult<(InputNotes<InputNote>, Vec<FailedNote>)> {
let executor = self.create_executor(data_store);
let checker = NoteConsumptionChecker::new(&executor);
match Box::pin(checker.check_notes_consumability(
data_store.account.id(),
data_store.reference_block.block_num(),
notes,
TransactionArgs::default(),
))
.await
{
Ok(NoteConsumptionInfo { successful, failed, .. }) => {
for failed_note in &failed {
tracing::info!(
note.id = %failed_note.note.id(),
nullifier = %failed_note.note.nullifier(),
err = %failed_note.error.as_report(),
"note failed consumability check",
);
}
let successful = InputNotes::from_unauthenticated_notes(successful)
.map_err(NtxError::InputNotes)?;
if successful.is_empty() {
return Err(NtxError::AllNotesFailed(failed));
}
Ok((successful, failed))
},
Err(err) => return Err(NtxError::NoteFilter(err)),
}
}
#[instrument(target = COMPONENT, name = "ntx.execute_transaction.execute", skip_all, err)]
async fn execute(
&self,
data_store: &NtxDataStore,
notes: InputNotes<InputNote>,
) -> NtxResult<ExecutedTransaction> {
let executor = self.create_executor(data_store);
Box::pin(executor.execute_transaction(
data_store.account.id(),
data_store.reference_block.block_num(),
notes,
TransactionArgs::default(),
))
.await
.map_err(NtxError::Execution)
}
#[instrument(target = COMPONENT, name = "ntx.execute_transaction.prove", skip_all, err)]
async fn prove(&self, tx_inputs: &TransactionInputs) -> NtxResult<ProvenTransaction> {
if let Some(remote) = &self.prover {
remote.prove(tx_inputs).await
} else {
let tx_inputs = tx_inputs.clone();
LocalTransactionProver::default().prove(tx_inputs).await
}
.map_err(NtxError::Proving)
}
#[instrument(target = COMPONENT, name = "ntx.execute_transaction.submit", skip_all, err)]
async fn submit(&self, proven_tx: &ProvenTransaction) -> NtxResult<()> {
self.block_producer
.submit_proven_transaction(proven_tx)
.await
.map_err(NtxError::Submission)
}
#[instrument(target = COMPONENT, name = "ntx.execute_transaction.validate", skip_all, err)]
async fn validate(
&self,
proven_tx: &ProvenTransaction,
tx_inputs: &TransactionInputs,
) -> NtxResult<()> {
self.validator
.submit_proven_transaction(proven_tx, tx_inputs)
.await
.map_err(NtxError::Submission)
}
}
struct NtxDataStore {
account: Account,
reference_block: BlockHeader,
chain_mmr: Arc<PartialBlockchain>,
mast_store: TransactionMastStore,
store: StoreClient,
script_cache: LruCache<Word, NoteScript>,
db: Db,
fetched_scripts: Arc<Mutex<Vec<(Word, NoteScript)>>>,
storage_slots: Arc<Mutex<BTreeMap<(AccountId, Word), StorageSlotName>>>,
}
impl NtxDataStore {
fn new(
account: Account,
reference_block: BlockHeader,
chain_mmr: Arc<PartialBlockchain>,
store: StoreClient,
script_cache: LruCache<Word, NoteScript>,
db: Db,
) -> Self {
let mast_store = TransactionMastStore::new();
mast_store.load_account_code(account.code());
Self {
account,
reference_block,
chain_mmr,
mast_store,
store,
script_cache,
db,
fetched_scripts: Arc::new(Mutex::new(Vec::new())),
storage_slots: Arc::new(Mutex::new(BTreeMap::default())),
}
}
async fn take_fetched_scripts(&self) -> Vec<(Word, NoteScript)> {
self.fetched_scripts.lock().await.drain(..).collect()
}
async fn register_storage_map_slots(
&self,
account_id: AccountId,
storage_header: &AccountStorageHeader,
) {
let mut storage_slots = self.storage_slots.lock().await;
for slot_header in storage_header.slots() {
if let StorageSlotType::Map = slot_header.slot_type() {
storage_slots.insert((account_id, slot_header.value()), slot_header.name().clone());
}
}
}
}
impl DataStore for NtxDataStore {
fn get_transaction_inputs(
&self,
account_id: AccountId,
ref_blocks: BTreeSet<BlockNumber>,
) -> impl FutureMaybeSend<Result<(PartialAccount, BlockHeader, PartialBlockchain), DataStoreError>>
{
async move {
if self.account.id() != account_id {
return Err(DataStoreError::AccountNotFound(account_id));
}
match ref_blocks.last().copied() {
Some(reference) if reference == self.reference_block.block_num() => {},
Some(other) => return Err(DataStoreError::BlockNotFound(other)),
None => return Err(DataStoreError::other("no reference block requested")),
}
self.register_storage_map_slots(account_id, &self.account.storage().to_header())
.await;
let partial_account = PartialAccount::from(&self.account);
Ok((partial_account, self.reference_block.clone(), (*self.chain_mmr).clone()))
}
}
fn get_foreign_account_inputs(
&self,
foreign_account_id: AccountId,
ref_block: BlockNumber,
) -> impl FutureMaybeSend<Result<AccountInputs, DataStoreError>> {
async move {
debug_assert_eq!(ref_block, self.reference_block.block_num());
let account_inputs =
self.store.get_account_inputs(foreign_account_id, ref_block).await.map_err(
|err| DataStoreError::other_with_source("failed to get account inputs", err),
)?;
self.mast_store.load_account_code(account_inputs.code());
self.register_storage_map_slots(foreign_account_id, account_inputs.storage().header())
.await;
Ok(account_inputs)
}
}
fn get_vault_asset_witnesses(
&self,
account_id: AccountId,
_vault_root: Word,
vault_keys: BTreeSet<AssetVaultKey>,
) -> impl FutureMaybeSend<Result<Vec<AssetWitness>, DataStoreError>> {
async move {
let ref_block = self.reference_block.block_num();
let witnesses = self
.store
.get_vault_asset_witnesses(account_id, vault_keys, Some(ref_block))
.await
.map_err(|err| {
DataStoreError::other_with_source("failed to get vault asset witnesses", err)
})?;
Ok(witnesses)
}
}
fn get_storage_map_witness(
&self,
account_id: AccountId,
map_root: Word,
map_key: StorageMapKey,
) -> impl FutureMaybeSend<Result<StorageMapWitness, DataStoreError>> {
async move {
let storage_slots = self.storage_slots.lock().await;
let Some(slot_name) = storage_slots.get(&(account_id, map_root)) else {
return Err(DataStoreError::other(
"requested storage slot has not been registered",
));
};
let ref_block = self.reference_block.block_num();
let witness = self
.store
.get_storage_map_witness(account_id, slot_name.clone(), map_key, Some(ref_block))
.await
.map_err(|err| {
DataStoreError::other_with_source("failed to get storage map witness", err)
})?;
Ok(witness)
}
}
fn get_note_script(
&self,
script_root: Word,
) -> impl FutureMaybeSend<Result<Option<NoteScript>, DataStoreError>> {
async move {
if let Some(cached_script) = self.script_cache.get(&script_root).await {
return Ok(Some(cached_script));
}
if let Some(script) = self.db.lookup_note_script(script_root).await.map_err(|err| {
DataStoreError::other_with_source("failed to look up note script in local DB", err)
})? {
self.script_cache.put(script_root, script.clone()).await;
return Ok(Some(script));
}
let maybe_script =
self.store.get_note_script_by_root(script_root).await.map_err(|err| {
DataStoreError::other_with_source(
"failed to retrieve note script from store",
err,
)
})?;
if let Some(script) = maybe_script {
self.fetched_scripts.lock().await.push((script_root, script.clone()));
self.script_cache.put(script_root, script.clone()).await;
Ok(Some(script))
} else {
Ok(None)
}
}
}
}
impl MastForestStore for NtxDataStore {
fn get(
&self,
procedure_hash: &miden_protocol::Word,
) -> Option<std::sync::Arc<miden_protocol::MastForest>> {
self.mast_store.get(procedure_hash)
}
}