pub(crate) mod account_effect;
pub mod account_state;
mod execute;
pub(crate) mod inflight_note;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use account_state::TransactionCandidate;
use futures::FutureExt;
use miden_node_proto::clients::{Builder, ValidatorClient};
use miden_node_proto::domain::account::NetworkAccountId;
use miden_node_proto::domain::mempool::MempoolEvent;
use miden_node_utils::ErrorReport;
use miden_node_utils::lru_cache::LruCache;
use miden_protocol::Word;
use miden_protocol::account::{Account, AccountDelta};
use miden_protocol::block::BlockNumber;
use miden_protocol::note::{NoteScript, Nullifier};
use miden_protocol::transaction::TransactionId;
use miden_remote_prover_client::RemoteTransactionProver;
use tokio::sync::{AcquireError, RwLock, Semaphore, mpsc};
use tokio_util::sync::CancellationToken;
use url::Url;
use crate::actor::inflight_note::InflightNetworkNote;
use crate::block_producer::BlockProducerClient;
use crate::builder::ChainState;
use crate::db::Db;
use crate::store::StoreClient;
pub enum ActorNotification {
NotesFailed {
nullifiers: Vec<Nullifier>,
block_num: BlockNumber,
},
CacheNoteScript { script_root: Word, script: NoteScript },
}
pub enum ActorShutdownReason {
EventChannelClosed,
SemaphoreFailed(AcquireError),
Cancelled(NetworkAccountId),
}
#[derive(Clone)]
pub struct AccountActorContext {
pub store: StoreClient,
pub block_producer_url: Url,
pub validator_url: Url,
pub tx_prover_url: Option<Url>,
pub chain_state: Arc<RwLock<ChainState>>,
pub script_cache: LruCache<Word, NoteScript>,
pub max_notes_per_tx: NonZeroUsize,
pub max_note_attempts: usize,
pub db: Db,
pub notification_tx: mpsc::Sender<ActorNotification>,
}
#[derive(Debug)]
pub enum AccountOrigin {
Transaction(Box<Account>),
Store(NetworkAccountId),
}
impl AccountOrigin {
pub fn transaction(delta: &AccountDelta) -> Option<Self> {
let account = Account::try_from(delta).ok()?;
if account.is_network() {
Some(AccountOrigin::Transaction(account.clone().into()))
} else {
None
}
}
pub fn store(account_id: NetworkAccountId) -> Self {
AccountOrigin::Store(account_id)
}
pub fn id(&self) -> NetworkAccountId {
match self {
AccountOrigin::Transaction(account) => NetworkAccountId::try_from(account.id())
.expect("actor accounts are always network accounts"),
AccountOrigin::Store(account_id) => *account_id,
}
}
}
#[derive(Debug)]
enum ActorMode {
NoViableNotes,
NotesAvailable,
TransactionInflight(TransactionId),
}
pub struct AccountActor {
origin: AccountOrigin,
store: StoreClient,
db: Db,
mode: ActorMode,
event_rx: mpsc::Receiver<Arc<MempoolEvent>>,
cancel_token: CancellationToken,
block_producer: BlockProducerClient,
validator: ValidatorClient,
prover: Option<RemoteTransactionProver>,
chain_state: Arc<RwLock<ChainState>>,
script_cache: LruCache<Word, NoteScript>,
max_notes_per_tx: NonZeroUsize,
max_note_attempts: usize,
notification_tx: mpsc::Sender<ActorNotification>,
}
impl AccountActor {
pub fn new(
origin: AccountOrigin,
actor_context: &AccountActorContext,
event_rx: mpsc::Receiver<Arc<MempoolEvent>>,
cancel_token: CancellationToken,
) -> Self {
let block_producer = BlockProducerClient::new(actor_context.block_producer_url.clone());
let validator = Builder::new(actor_context.validator_url.clone())
.without_tls()
.with_timeout(Duration::from_secs(10))
.without_metadata_version()
.without_metadata_genesis()
.with_otel_context_injection()
.connect_lazy::<ValidatorClient>();
let prover = actor_context.tx_prover_url.clone().map(RemoteTransactionProver::new);
Self {
origin,
store: actor_context.store.clone(),
db: actor_context.db.clone(),
mode: ActorMode::NoViableNotes,
event_rx,
cancel_token,
block_producer,
validator,
prover,
chain_state: actor_context.chain_state.clone(),
script_cache: actor_context.script_cache.clone(),
max_notes_per_tx: actor_context.max_notes_per_tx,
max_note_attempts: actor_context.max_note_attempts,
notification_tx: actor_context.notification_tx.clone(),
}
}
pub async fn run(mut self, semaphore: Arc<Semaphore>) -> ActorShutdownReason {
let account_id = self.origin.id();
let block_num = self.chain_state.read().await.chain_tip_header.block_num();
let has_notes = self
.db
.has_available_notes(account_id, block_num, self.max_note_attempts)
.await
.expect("actor should be able to check for available notes");
if has_notes {
self.mode = ActorMode::NotesAvailable;
}
loop {
let tx_permit_acquisition = match self.mode {
ActorMode::NoViableNotes | ActorMode::TransactionInflight(_) => {
std::future::pending().boxed()
},
ActorMode::NotesAvailable => semaphore.acquire().boxed(),
};
tokio::select! {
_ = self.cancel_token.cancelled() => {
return ActorShutdownReason::Cancelled(account_id);
}
event = self.event_rx.recv() => {
let Some(event) = event else {
return ActorShutdownReason::EventChannelClosed;
};
if let ActorMode::TransactionInflight(awaited_id) = self.mode {
let should_wake = match event.as_ref() {
MempoolEvent::TransactionAdded { id, .. } => *id == awaited_id,
MempoolEvent::BlockCommitted { txs, .. } => {
txs.contains(&awaited_id)
},
MempoolEvent::TransactionsReverted(tx_ids) => {
tx_ids.contains(&awaited_id)
},
};
if should_wake {
self.mode = ActorMode::NotesAvailable;
}
} else {
self.mode = ActorMode::NotesAvailable;
}
},
permit = tx_permit_acquisition => {
match permit {
Ok(_permit) => {
let chain_state = self.chain_state.read().await.clone();
let tx_candidate = self.select_candidate_from_db(
account_id,
chain_state,
).await;
if let Some(tx_candidate) = tx_candidate {
self.execute_transactions(account_id, tx_candidate).await;
} else {
self.mode = ActorMode::NoViableNotes;
}
}
Err(err) => {
return ActorShutdownReason::SemaphoreFailed(err);
}
}
}
}
}
}
async fn select_candidate_from_db(
&self,
account_id: NetworkAccountId,
chain_state: ChainState,
) -> Option<TransactionCandidate> {
let block_num = chain_state.chain_tip_header.block_num();
let max_notes = self.max_notes_per_tx.get();
let (latest_account, notes) = self
.db
.select_candidate(account_id, block_num, self.max_note_attempts)
.await
.expect("actor should be able to query DB for candidate");
let account = latest_account?;
let notes: Vec<_> = notes.into_iter().take(max_notes).collect();
if notes.is_empty() {
return None;
}
let (chain_tip_header, chain_mmr) = chain_state.into_parts();
Some(TransactionCandidate {
account,
notes,
chain_tip_header,
chain_mmr,
})
}
#[tracing::instrument(name = "ntx.actor.execute_transactions", skip(self, tx_candidate))]
async fn execute_transactions(
&mut self,
account_id: NetworkAccountId,
tx_candidate: TransactionCandidate,
) {
let block_num = tx_candidate.chain_tip_header.block_num();
let context = execute::NtxContext::new(
self.block_producer.clone(),
self.validator.clone(),
self.prover.clone(),
self.store.clone(),
self.script_cache.clone(),
self.db.clone(),
);
let notes = tx_candidate.notes.clone();
let account_id = tx_candidate.account.id();
let note_ids: Vec<_> = notes.iter().map(|n| n.to_inner().as_note().id()).collect();
tracing::info!(
%account_id,
?note_ids,
num_notes = notes.len(),
"executing network transaction",
);
let execution_result = context.execute_transaction(tx_candidate).await;
match execution_result {
Ok((tx_id, failed, scripts_to_cache)) if failed.is_empty() => {
self.cache_note_scripts(scripts_to_cache).await;
self.mode = ActorMode::TransactionInflight(tx_id);
},
Ok((tx_id, failed, scripts_to_cache)) => {
tracing::info!(
%account_id,
%tx_id,
num_failed = failed.len(),
"network transaction executed with some failed notes",
);
self.cache_note_scripts(scripts_to_cache).await;
let nullifiers: Vec<_> =
failed.into_iter().map(|note| note.note.nullifier()).collect();
self.mark_notes_failed(&nullifiers, block_num).await;
self.mode = ActorMode::TransactionInflight(tx_id);
},
Err(err) => {
tracing::error!(
%account_id,
?note_ids,
err = err.as_report(),
"network transaction failed",
);
self.mode = ActorMode::NoViableNotes;
let nullifiers: Vec<_> = notes.iter().map(InflightNetworkNote::nullifier).collect();
self.mark_notes_failed(&nullifiers, block_num).await;
},
}
}
async fn cache_note_scripts(&self, scripts: Vec<(Word, NoteScript)>) {
for (script_root, script) in scripts {
let _ = self
.notification_tx
.send(ActorNotification::CacheNoteScript { script_root, script })
.await;
}
}
async fn mark_notes_failed(&self, nullifiers: &[Nullifier], block_num: BlockNumber) {
let _ = self
.notification_tx
.send(ActorNotification::NotesFailed {
nullifiers: nullifiers.to_vec(),
block_num,
})
.await;
}
}
#[expect(clippy::cast_precision_loss, clippy::cast_sign_loss)]
fn has_backoff_passed(
chain_tip: BlockNumber,
last_attempt: Option<BlockNumber>,
attempts: usize,
) -> bool {
if attempts == 0 {
return true;
}
let blocks_passed = last_attempt
.and_then(|last| chain_tip.checked_sub(last.as_u32()))
.unwrap_or_default();
let backoff_threshold = (0.25 * attempts as f64).exp().round() as usize;
blocks_passed.as_usize() > backoff_threshold
}
#[cfg(test)]
mod tests {
use miden_protocol::block::BlockNumber;
use super::has_backoff_passed;
#[rstest::rstest]
#[test]
#[case::all_zero(Some(BlockNumber::GENESIS), BlockNumber::GENESIS, 0, true)]
#[case::no_attempts(None, BlockNumber::GENESIS, 0, true)]
#[case::one_attempt(Some(BlockNumber::GENESIS), BlockNumber::from(2), 1, true)]
#[case::three_attempts(Some(BlockNumber::GENESIS), BlockNumber::from(3), 3, true)]
#[case::ten_attempts(Some(BlockNumber::GENESIS), BlockNumber::from(13), 10, true)]
#[case::twenty_attempts(Some(BlockNumber::GENESIS), BlockNumber::from(149), 20, true)]
#[case::one_attempt_false(Some(BlockNumber::GENESIS), BlockNumber::from(1), 1, false)]
#[case::three_attempts_false(Some(BlockNumber::GENESIS), BlockNumber::from(2), 3, false)]
#[case::ten_attempts_false(Some(BlockNumber::GENESIS), BlockNumber::from(12), 10, false)]
#[case::twenty_attempts_false(Some(BlockNumber::GENESIS), BlockNumber::from(148), 20, false)]
fn backoff_has_passed(
#[case] last_attempt_block_num: Option<BlockNumber>,
#[case] current_block_num: BlockNumber,
#[case] attempt_count: usize,
#[case] backoff_should_have_passed: bool,
) {
assert_eq!(
backoff_should_have_passed,
has_backoff_passed(current_block_num, last_attempt_block_num, attempt_count)
);
}
}