use std::collections::{BTreeSet, HashMap};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use backon::ExponentialBuilder;
use miden_node_utils::ErrorReport;
use miden_node_utils::lru_cache::LruCache;
use miden_node_utils::retry::{self, Retryable};
use miden_node_utils::spawn::spawn_blocking_in_current_span;
use miden_node_utils::tracing::OpenTelemetrySpanExt;
use miden_protocol::Word;
use miden_protocol::account::{
Account,
AccountId,
AccountStorageHeader,
PartialAccount,
StorageMapKey,
StorageMapWitness,
StorageSlotName,
StorageSlotType,
};
use miden_protocol::asset::{AssetVaultKey, AssetWitness};
use miden_protocol::block::{BlockHeader, BlockNumber};
use miden_protocol::errors::TransactionInputError;
use miden_protocol::note::{Note, NoteScript, NoteScriptRoot};
use miden_protocol::transaction::{
AccountInputs,
ExecutedTransaction,
InputNote,
InputNotes,
PartialBlockchain,
ProvenTransaction,
TransactionArgs,
TransactionId,
TransactionInputs,
TransactionScript,
};
use miden_protocol::vm::FutureMaybeSend;
use miden_remote_prover_client::RemoteTransactionProver;
use miden_standards::note::AccountTargetNetworkNote;
use miden_tx::auth::UnreachableAuth;
use miden_tx::{
DataStore,
DataStoreError,
ExecutionOptions,
FailedNote,
MastForestStore,
NoteCheckerError,
NoteConsumptionChecker,
TransactionExecutor,
TransactionExecutorError,
TransactionMastStore,
TransactionProverError,
};
use tracing::{Instrument, instrument};
use crate::COMPONENT;
use crate::actor::candidate::TransactionCandidate;
use crate::clients::{RpcClient, RpcError};
use crate::db::Db;
#[derive(Debug, thiserror::Error)]
pub enum NtxError {
#[error("note inputs were invalid")]
InputNotes(#[source] TransactionInputError),
#[error("failed to filter notes")]
NoteFilter(#[source] NoteCheckerError),
#[error("all notes failed to be executed")]
AllNotesFailed(Vec<FailedNote>),
#[error("failed to execute transaction")]
Execution(#[source] TransactionExecutorError),
#[error("failed to prove transaction")]
Proving(#[source] TransactionProverError),
#[error("failed to submit transaction")]
Submission(#[source] tonic::Status),
}
type NtxResult<T> = Result<T, NtxError>;
fn is_transient_status(status: &tonic::Status) -> bool {
matches!(
status.code(),
tonic::Code::Unavailable
| tonic::Code::DeadlineExceeded
| tonic::Code::Cancelled
| tonic::Code::Aborted
| tonic::Code::Unknown
| tonic::Code::Internal
| tonic::Code::ResourceExhausted,
)
}
fn is_transient_rpc_error(err: &RpcError) -> bool {
matches!(err, RpcError::GrpcClientError(status) if is_transient_status(status))
}
const MAX_REQUEST_RETRIES: usize = 20;
fn request_backoff(initial: Duration, max: Duration) -> ExponentialBuilder {
retry::exponential_bounded(initial, max, MAX_REQUEST_RETRIES)
}
fn log_transient_retry<E: std::error::Error>(operation: &'static str, err: &E, sleep: Duration) {
tracing::warn!(
target: COMPONENT,
operation,
err = %err.as_report(),
sleep_ms = sleep.as_millis() as u64,
"ntx transient request failure; retrying after backoff",
);
}
pub type NtxExecutionResult = (TransactionId, Vec<FailedNote>, Vec<(Word, NoteScript)>);
#[derive(Clone)]
pub struct NtxContext {
prover: RemoteTransactionProver,
rpc: RpcClient,
script_cache: LruCache<Word, NoteScript>,
db: Db,
max_cycles: u32,
#[expect(
dead_code,
reason = "Disabled until https://github.com/0xMiden/protocol/issues/3050 lands"
)]
expiration_script: TransactionScript,
request_backoff: ExponentialBuilder,
}
impl NtxContext {
#[expect(
clippy::too_many_arguments,
reason = "execution context aggregates actor resources"
)]
pub fn new(
prover: RemoteTransactionProver,
rpc: RpcClient,
script_cache: LruCache<Word, NoteScript>,
db: Db,
max_cycles: u32,
expiration_script: TransactionScript,
request_backoff_initial: Duration,
request_backoff_max: Duration,
) -> Self {
let request_backoff = request_backoff(request_backoff_initial, request_backoff_max);
Self {
prover,
rpc,
script_cache,
db,
max_cycles,
expiration_script,
request_backoff,
}
}
fn request_backoff(&self) -> ExponentialBuilder {
self.request_backoff
}
fn create_executor<'a, 'b>(
&self,
data_store: &'a NtxDataStore,
) -> TransactionExecutor<'a, 'b, NtxDataStore, UnreachableAuth> {
let exec_options = ExecutionOptions::new(
Some(self.max_cycles),
self.max_cycles,
ExecutionOptions::DEFAULT_CORE_TRACE_FRAGMENT_SIZE,
false,
false,
)
.expect("max_cycles should be within valid range");
TransactionExecutor::new(data_store)
.with_options(exec_options)
.expect("execution options should be valid for transaction executor")
}
#[instrument(target = COMPONENT, name = "ntx.execute_transaction", skip_all, err)]
pub fn execute_transaction(
self,
tx: TransactionCandidate,
) -> impl FutureMaybeSend<NtxResult<NtxExecutionResult>> {
let TransactionCandidate {
account,
notes,
chain_tip_header,
chain_mmr,
} = tx;
tracing::Span::current().set_attribute("account.id", account.id());
tracing::Span::current()
.set_attribute("account.id.network_prefix", account.id().prefix().to_string().as_str());
tracing::Span::current().set_attribute("notes.count", notes.len());
tracing::Span::current()
.set_attribute("reference_block.number", chain_tip_header.block_num());
async move {
Box::pin(async move {
let notes =
notes.into_iter().map(AccountTargetNetworkNote::into_note).collect::<Vec<_>>();
let ctx = self.clone();
let handle = tokio::runtime::Handle::current();
let span = tracing::Span::current();
let (executed_tx, failed_notes, scripts_to_cache) =
spawn_blocking_in_current_span(move || {
let data_store = NtxDataStore::new(
account,
chain_tip_header,
chain_mmr,
ctx.rpc.clone(),
ctx.script_cache.clone(),
ctx.db.clone(),
ctx.request_backoff,
);
handle.block_on(
async {
let (successful_notes, failed_notes) =
ctx.filter_notes(&data_store, notes).await?;
let executed_tx =
Box::pin(ctx.execute(&data_store, successful_notes)).await?;
let scripts_to_cache = data_store.take_fetched_scripts();
Ok::<_, NtxError>((executed_tx, failed_notes, scripts_to_cache))
}
.instrument(span),
)
})
.await
.unwrap_or_else(|err| std::panic::resume_unwind(err.into_panic()))?;
let tx_inputs: TransactionInputs = executed_tx.into();
let proven_tx = Box::pin(self.prove(&tx_inputs)).await?;
self.submit(&proven_tx, &tx_inputs).await?;
Ok((proven_tx.id(), failed_notes, scripts_to_cache))
})
.in_current_span()
.await
.inspect_err(|err| tracing::Span::current().set_error(err))
}
}
#[instrument(target = COMPONENT, name = "ntx.execute_transaction.filter_notes", skip_all, err)]
async fn filter_notes(
&self,
data_store: &NtxDataStore,
notes: Vec<Note>,
) -> NtxResult<(InputNotes<InputNote>, Vec<FailedNote>)> {
let executor = self.create_executor(data_store);
let checker = NoteConsumptionChecker::new(&executor);
match Box::pin(checker.check_notes_consumability(
data_store.account.id(),
data_store.reference_block.block_num(),
notes,
TransactionArgs::default(),
))
.await
{
Ok(consumption_info) => {
let (successful, failed) = consumption_info.into_parts();
for failed_note in &failed {
tracing::info!(
note.id = %failed_note.note().id(),
nullifier = %failed_note.note().nullifier(),
err = %failed_note.error().as_report(),
"note failed consumability check",
);
}
let successful_notes =
successful.into_iter().map(|s| s.note().clone()).collect::<Vec<_>>();
let successful = InputNotes::from_unauthenticated_notes(successful_notes)
.map_err(NtxError::InputNotes)?;
if successful.is_empty() {
return Err(NtxError::AllNotesFailed(failed));
}
Ok((successful, failed))
},
Err(err) => return Err(NtxError::NoteFilter(err)),
}
}
#[instrument(target = COMPONENT, name = "ntx.execute_transaction.execute", skip_all, err)]
async fn execute(
&self,
data_store: &NtxDataStore,
notes: InputNotes<InputNote>,
) -> NtxResult<ExecutedTransaction> {
let executor = self.create_executor(data_store);
let tx_args = TransactionArgs::default();
Box::pin(executor.execute_transaction(
data_store.account.id(),
data_store.reference_block.block_num(),
notes,
tx_args,
))
.await
.map_err(NtxError::Execution)
}
#[instrument(target = COMPONENT, name = "ntx.execute_transaction.prove", skip_all, err)]
async fn prove(&self, tx_inputs: &TransactionInputs) -> NtxResult<ProvenTransaction> {
(|| async { self.prover.prove(tx_inputs).await })
.retry(self.request_backoff())
.when(|err| matches!(err, TransactionProverError::Other { .. }))
.notify(|err, dur| {
log_transient_retry("remote_prover.prove", err, dur);
})
.await
.map_err(NtxError::Proving)
}
#[instrument(target = COMPONENT, name = "ntx.execute_transaction.submit", skip_all, err)]
async fn submit(
&self,
proven_tx: &ProvenTransaction,
tx_inputs: &TransactionInputs,
) -> NtxResult<()> {
(|| async { self.rpc.submit_proven_tx(proven_tx, tx_inputs).await })
.retry(self.request_backoff())
.when(is_transient_status)
.notify(|status, dur| {
log_transient_retry("rpc.submit_proven_tx", status, dur);
})
.await
.map_err(NtxError::Submission)
}
}
struct NtxDataStore {
account: Account,
reference_block: BlockHeader,
chain_mmr: Arc<PartialBlockchain>,
mast_store: TransactionMastStore,
rpc: RpcClient,
script_cache: LruCache<Word, NoteScript>,
db: Db,
fetched_scripts: Arc<Mutex<Vec<(Word, NoteScript)>>>,
storage_slots: Arc<Mutex<HashMap<(AccountId, Word), StorageSlotName>>>,
request_backoff: ExponentialBuilder,
}
impl NtxDataStore {
fn new(
account: Account,
reference_block: BlockHeader,
chain_mmr: Arc<PartialBlockchain>,
rpc: RpcClient,
script_cache: LruCache<Word, NoteScript>,
db: Db,
request_backoff: ExponentialBuilder,
) -> Self {
let mast_store = TransactionMastStore::new();
mast_store.load_account_code(account.code());
Self {
account,
reference_block,
chain_mmr,
mast_store,
rpc,
script_cache,
db,
fetched_scripts: Arc::new(Mutex::new(Vec::new())),
storage_slots: Arc::new(Mutex::new(HashMap::default())),
request_backoff,
}
}
fn rpc_backoff(&self) -> ExponentialBuilder {
self.request_backoff
}
fn take_fetched_scripts(&self) -> Vec<(Word, NoteScript)> {
self.fetched_scripts
.lock()
.expect("fetched scripts lock poisoned")
.drain(..)
.collect()
}
fn register_storage_map_slots(
&self,
account_id: AccountId,
storage_header: &AccountStorageHeader,
) {
let mut storage_slots = self.storage_slots.lock().expect("storage slots lock poisoned");
for slot_header in storage_header.slots() {
if let StorageSlotType::Map = slot_header.slot_type() {
storage_slots.insert((account_id, slot_header.value()), slot_header.name().clone());
}
}
}
}
impl DataStore for NtxDataStore {
fn get_transaction_inputs(
&self,
account_id: AccountId,
ref_blocks: BTreeSet<BlockNumber>,
) -> impl FutureMaybeSend<Result<(PartialAccount, BlockHeader, PartialBlockchain), DataStoreError>>
{
async move {
if self.account.id() != account_id {
return Err(DataStoreError::AccountNotFound(account_id));
}
match ref_blocks.last().copied() {
Some(reference) if reference == self.reference_block.block_num() => {},
Some(other) => return Err(DataStoreError::BlockNotFound(other)),
None => return Err(DataStoreError::other("no reference block requested")),
}
self.register_storage_map_slots(account_id, &self.account.storage().to_header());
let partial_account = PartialAccount::from(&self.account);
Ok((partial_account, self.reference_block.clone(), (*self.chain_mmr).clone()))
}
}
fn get_foreign_account_inputs(
&self,
foreign_account_id: AccountId,
ref_block: BlockNumber,
) -> impl FutureMaybeSend<Result<AccountInputs, DataStoreError>> {
async move {
debug_assert_eq!(ref_block, self.reference_block.block_num());
let account_inputs =
(|| async { self.rpc.get_account_inputs(foreign_account_id, ref_block).await })
.retry(self.rpc_backoff())
.when(is_transient_rpc_error)
.notify(|err, dur| {
log_transient_retry("rpc.get_account_inputs", err, dur);
})
.await
.map_err(|err| {
DataStoreError::other_with_source("failed to get account inputs", err)
})?;
self.mast_store.load_account_code(account_inputs.code());
self.register_storage_map_slots(foreign_account_id, account_inputs.storage().header());
Ok(account_inputs)
}
}
fn get_vault_asset_witnesses(
&self,
account_id: AccountId,
_vault_root: Word,
vault_keys: BTreeSet<AssetVaultKey>,
) -> impl FutureMaybeSend<Result<Vec<AssetWitness>, DataStoreError>> {
async move {
let ref_block = self.reference_block.block_num();
let witnesses = (|| {
let vault_keys = vault_keys.clone();
async move {
self.rpc
.get_vault_asset_witnesses(account_id, vault_keys, Some(ref_block))
.await
}
})
.retry(self.rpc_backoff())
.when(is_transient_rpc_error)
.notify(|err, dur| {
log_transient_retry("rpc.get_vault_asset_witnesses", err, dur);
})
.await
.map_err(|err| {
DataStoreError::other_with_source("failed to get vault asset witnesses", err)
})?;
Ok(witnesses)
}
}
fn get_storage_map_witness(
&self,
account_id: AccountId,
map_root: Word,
map_key: StorageMapKey,
) -> impl FutureMaybeSend<Result<StorageMapWitness, DataStoreError>> {
async move {
let slot_name = {
let storage_slots = self.storage_slots.lock().expect("storage slots lock poisoned");
let Some(slot_name) = storage_slots.get(&(account_id, map_root)) else {
return Err(DataStoreError::other(
"requested storage slot has not been registered",
));
};
slot_name.clone()
};
let ref_block = self.reference_block.block_num();
let witness = (|| {
let slot_name = slot_name.clone();
async move {
self.rpc
.get_storage_map_witness(account_id, slot_name, map_key, Some(ref_block))
.await
}
})
.retry(self.rpc_backoff())
.when(is_transient_rpc_error)
.notify(|err, dur| {
log_transient_retry("rpc.get_storage_map_witness", err, dur);
})
.await
.map_err(|err| {
DataStoreError::other_with_source("failed to get storage map witness", err)
})?;
Ok(witness)
}
}
fn get_note_script(
&self,
script_root: NoteScriptRoot,
) -> impl FutureMaybeSend<Result<Option<NoteScript>, DataStoreError>> {
async move {
let script_root = Word::from(script_root);
if let Some(cached_script) = self.script_cache.get(&script_root) {
return Ok(Some(cached_script));
}
if let Some(script) = self.db.lookup_note_script(script_root).await.map_err(|err| {
DataStoreError::other_with_source("failed to look up note script in local DB", err)
})? {
self.script_cache.put(script_root, script.clone());
return Ok(Some(script));
}
let maybe_script = (|| async { self.rpc.get_note_script_by_root(script_root).await })
.retry(self.rpc_backoff())
.when(is_transient_rpc_error)
.notify(|err, dur| {
log_transient_retry("rpc.get_note_script_by_root", err, dur);
})
.await
.map_err(|err| {
DataStoreError::other_with_source(
"failed to retrieve note script from RPC",
err,
)
})?;
if let Some(script) = maybe_script {
self.fetched_scripts
.lock()
.expect("fetched scripts lock poisoned")
.push((script_root, script.clone()));
self.script_cache.put(script_root, script.clone());
Ok(Some(script))
} else {
Ok(None)
}
}
}
}
impl MastForestStore for NtxDataStore {
fn get(
&self,
procedure_hash: &miden_protocol::Word,
) -> Option<std::sync::Arc<miden_protocol::MastForest>> {
self.mast_store.get(procedure_hash)
}
}
#[cfg(test)]
mod tests {
use miden_tx::TransactionProverError;
use super::{RpcError, is_transient_rpc_error, is_transient_status};
#[test]
fn transient_status_classifies_transport_codes() {
let transient = [
tonic::Status::unavailable("u"),
tonic::Status::deadline_exceeded("d"),
tonic::Status::cancelled("c"),
tonic::Status::aborted("a"),
tonic::Status::unknown("u"),
tonic::Status::internal("i"),
tonic::Status::resource_exhausted("r"),
];
for s in &transient {
assert!(is_transient_status(s), "{:?} should be transient", s.code());
}
let terminal = [
tonic::Status::invalid_argument("ia"),
tonic::Status::failed_precondition("fp"),
tonic::Status::out_of_range("oor"),
tonic::Status::not_found("nf"),
tonic::Status::already_exists("ae"),
tonic::Status::unauthenticated("ua"),
tonic::Status::permission_denied("pd"),
tonic::Status::unimplemented("ui"),
tonic::Status::data_loss("dl"),
];
for s in &terminal {
assert!(!is_transient_status(s), "{:?} should be terminal", s.code());
}
}
#[test]
fn transient_rpc_error_only_for_transient_grpc() {
let transient = RpcError::GrpcClientError(tonic::Status::unavailable("down"));
assert!(is_transient_rpc_error(&transient));
let terminal_grpc = RpcError::GrpcClientError(tonic::Status::invalid_argument("bad input"));
assert!(!is_transient_rpc_error(&terminal_grpc));
let non_grpc = RpcError::Deserialize(
miden_protocol::utils::serde::DeserializationError::InvalidValue("bad".into()),
);
assert!(!is_transient_rpc_error(&non_grpc));
}
#[test]
fn prover_other_is_the_retried_variant() {
let err = TransactionProverError::other("remote prover unreachable");
assert!(matches!(err, TransactionProverError::Other { .. }));
}
}