mod allowlist;
pub mod candidate;
mod execute;
use std::num::{NonZeroU16, NonZeroUsize};
use std::sync::Arc;
use std::time::Duration;
use allowlist::{NoteScriptNotAllowlisted, partition_by_allowlist};
use anyhow::Context;
use candidate::TransactionCandidate;
use futures::FutureExt;
use miden_node_utils::ErrorReport;
use miden_node_utils::lru_cache::LruCache;
use miden_protocol::Word;
use miden_protocol::account::AccountId;
use miden_protocol::block::BlockNumber;
use miden_protocol::note::{NoteScript, Nullifier};
use miden_protocol::transaction::{TransactionId, TransactionScript};
use miden_remote_prover_client::RemoteTransactionProver;
use miden_standards::code_builder::CodeBuilder;
use miden_tx::FailedNote;
use tokio::sync::{Notify, Semaphore, mpsc};
use crate::NoteError;
use crate::chain_state::{ChainState, SharedChainState};
use crate::clients::RpcClient;
use crate::db::Db;
pub(crate) fn expiration_tx_script(delta: NonZeroU16) -> anyhow::Result<TransactionScript> {
let delta = delta.get();
let source = format!(
"begin\n push.{delta} exec.::miden::protocol::tx::update_expiration_block_delta\nend"
);
CodeBuilder::new()
.compile_tx_script(source)
.context("failed to compile network-tx expiration script")
}
pub enum ActorRequest {
NotesFailed {
failed_notes: Vec<(Nullifier, NoteError)>,
block_num: BlockNumber,
ack_tx: tokio::sync::oneshot::Sender<()>,
},
CacheNoteScript { script_root: Word, script: NoteScript },
}
#[derive(Clone)]
pub struct GrpcClients {
pub rpc: RpcClient,
pub prover: RemoteTransactionProver,
}
#[derive(Clone)]
pub struct State {
pub db: Db,
pub chain: Arc<SharedChainState>,
pub script_cache: LruCache<Word, NoteScript>,
pub expiration_script: TransactionScript,
}
#[derive(Debug, Clone, Copy)]
pub struct ActorConfig {
pub max_notes_per_tx: NonZeroUsize,
pub max_note_attempts: usize,
pub idle_timeout: Duration,
pub max_cycles: u32,
pub tx_expiration_delta: NonZeroU16,
pub request_backoff_initial: Duration,
pub request_backoff_max: Duration,
}
#[derive(Clone)]
pub struct AccountActorContext {
pub clients: GrpcClients,
pub state: State,
pub config: ActorConfig,
pub request_tx: mpsc::Sender<ActorRequest>,
}
#[cfg(test)]
impl AccountActorContext {
pub fn test(db: &crate::db::Db) -> Self {
use miden_protocol::crypto::merkle::mmr::{Forest, MmrPeaks, PartialMmr};
use url::Url;
use crate::chain_state::SharedChainState;
use crate::clients::RpcClient;
use crate::test_utils::mock_block_header;
let url = Url::parse("http://127.0.0.1:1").unwrap();
let block_header = mock_block_header(0_u32.into());
let chain_mmr = PartialMmr::from_peaks(
MmrPeaks::new(Forest::new(0).expect("forest 0 is valid"), vec![]).unwrap(),
);
let chain_state = Arc::new(SharedChainState::new(block_header, chain_mmr));
let (request_tx, _request_rx) = mpsc::channel(1);
Self {
clients: GrpcClients {
rpc: RpcClient::new(
url.clone(),
miden_protocol::Word::default(),
Duration::from_millis(100),
Duration::from_secs(30),
)
.expect("rpc client should be constructed"),
prover: RemoteTransactionProver::new(url.as_str()),
},
state: State {
db: db.clone(),
chain: chain_state,
script_cache: LruCache::new(NonZeroUsize::new(1).unwrap()),
expiration_script: expiration_tx_script(NonZeroU16::new(30).unwrap())
.expect("expiration script should compile"),
},
config: ActorConfig {
max_notes_per_tx: NonZeroUsize::new(1).unwrap(),
max_note_attempts: 1,
idle_timeout: Duration::from_secs(60),
max_cycles: 1 << 18,
tx_expiration_delta: NonZeroU16::new(30).unwrap(),
request_backoff_initial: Duration::from_millis(1),
request_backoff_max: Duration::from_millis(10),
},
request_tx,
}
}
}
#[derive(Debug)]
enum ActorMode {
NoViableNotes,
NotesAvailable,
WaitForBlock {
submitted_tx_id: TransactionId,
submitted_at: BlockNumber,
},
}
pub struct AccountActor {
account_id: AccountId,
clients: GrpcClients,
state: State,
config: ActorConfig,
notify: Arc<Notify>,
request: mpsc::Sender<ActorRequest>,
}
impl AccountActor {
pub fn new(
account_id: AccountId,
actor_context: &AccountActorContext,
notify: Arc<Notify>,
) -> Self {
Self {
account_id,
clients: actor_context.clients.clone(),
state: actor_context.state.clone(),
config: actor_context.config,
notify,
request: actor_context.request_tx.clone(),
}
}
pub async fn run(self, semaphore: Arc<Semaphore>) -> anyhow::Result<()> {
let account_id = self.account_id;
if !self.wait_for_committed_account(account_id).await? {
return Ok(());
}
let block_num = self.state.chain.chain_tip_block_number();
let has_notes = self
.state
.db
.has_available_notes(account_id, block_num, self.config.max_note_attempts)
.await
.context("failed to check for available notes")?;
let mut mode = if has_notes {
ActorMode::NotesAvailable
} else {
ActorMode::NoViableNotes
};
loop {
let tx_permit_acquisition = match mode {
ActorMode::NoViableNotes | ActorMode::WaitForBlock { .. } => {
std::future::pending().boxed()
},
ActorMode::NotesAvailable => semaphore.acquire().boxed(),
};
let idle_timeout_sleep = match mode {
ActorMode::NoViableNotes => tokio::time::sleep(self.config.idle_timeout).boxed(),
_ => std::future::pending().boxed(),
};
tokio::select! {
_ = self.notify.notified() => {
mode = self.reevaluate_mode(account_id, mode).await?;
},
permit = tx_permit_acquisition => {
let _permit = permit.context("semaphore closed")?;
let chain_state = self.state.chain.get_cloned();
let tx_candidate =
self.select_candidate_from_db(account_id, chain_state).await?;
mode = match tx_candidate {
Some(candidate) => self.execute_transactions(account_id, candidate).await,
None => ActorMode::NoViableNotes,
};
}
() = idle_timeout_sleep => {
tracing::info!(%account_id, "Account actor deactivated due to idle timeout");
return Ok(());
}
}
}
}
async fn reevaluate_mode(
&self,
account_id: AccountId,
mode: ActorMode,
) -> anyhow::Result<ActorMode> {
match mode {
ActorMode::WaitForBlock { submitted_tx_id, submitted_at } => {
let landed = self
.state
.db
.account_last_tx(account_id)
.await
.context("failed to check submitted tx landing")?
== Some(submitted_tx_id);
if landed {
return Ok(ActorMode::NotesAvailable);
}
let chain_tip = self.state.chain.chain_tip_block_number();
let elapsed = chain_tip.checked_sub(submitted_at.as_u32()).unwrap_or_default();
if elapsed.as_u32() >= u32::from(self.config.tx_expiration_delta.get()) {
tracing::info!(
%account_id,
%submitted_at,
current_tip = %chain_tip,
delta = self.config.tx_expiration_delta,
"submitted transaction expired",
);
return Ok(ActorMode::NotesAvailable);
}
Ok(ActorMode::WaitForBlock { submitted_tx_id, submitted_at })
},
_ => Ok(ActorMode::NotesAvailable),
}
}
async fn select_candidate_from_db(
&self,
account_id: AccountId,
chain_state: ChainState,
) -> anyhow::Result<Option<TransactionCandidate>> {
let block_num = chain_state.chain_tip_header.block_num();
let max_notes = self.config.max_notes_per_tx.get();
let (latest_account, notes) = self
.state
.db
.select_candidate(account_id, block_num, self.config.max_note_attempts)
.await
.context("failed to query DB for transaction candidate")?;
let Some(account) = latest_account else {
tracing::info!(account_id = %account_id, "Account no longer exists in DB");
return Ok(None);
};
let partitioned_notes = partition_by_allowlist(&account, notes)
.context("failed to read network account note allowlist")?;
if !partitioned_notes.rejected.is_empty() {
let failed_notes = partitioned_notes
.rejected
.into_iter()
.map(|(nullifier, script_root)| {
let error: NoteError = Arc::new(NoteScriptNotAllowlisted::new(script_root));
(nullifier, error)
})
.collect::<Vec<_>>();
tracing::info!(
%account_id,
rejected_count = failed_notes.len(),
"dropping network notes whose script roots are not allowlisted",
);
self.mark_notes_failed(&failed_notes, block_num).await;
}
let notes: Vec<_> = partitioned_notes.allowed.into_iter().take(max_notes).collect();
if notes.is_empty() {
return Ok(None);
}
let (chain_tip_header, chain_mmr) = chain_state.into_parts();
Ok(Some(TransactionCandidate {
account,
notes,
chain_tip_header,
chain_mmr,
}))
}
async fn wait_for_committed_account(&self, account_id: AccountId) -> anyhow::Result<bool> {
if self
.state
.db
.has_committed_account(account_id)
.await
.context("failed to check for committed account")?
{
return Ok(true);
}
loop {
tokio::select! {
_ = self.notify.notified() => {
if self
.state
.db
.has_committed_account(account_id)
.await
.context("failed to check for committed account")?
{
tracing::info!(account.id=%account_id, "Account committed, starting normal operation");
return Ok(true);
}
}
_ = tokio::time::sleep(self.config.idle_timeout) => {
tracing::info!(
%account_id,
"Account actor deactivated while waiting for account commit",
);
return Ok(false);
}
}
}
}
#[tracing::instrument(name = "ntx.actor.execute_transactions", skip(self, tx_candidate))]
async fn execute_transactions(
&self,
account_id: AccountId,
tx_candidate: TransactionCandidate,
) -> ActorMode {
let block_num = tx_candidate.chain_tip_header.block_num();
let context = execute::NtxContext::new(
self.clients.prover.clone(),
self.clients.rpc.clone(),
self.state.script_cache.clone(),
self.state.db.clone(),
self.config.max_cycles,
self.state.expiration_script.clone(),
self.config.request_backoff_initial,
self.config.request_backoff_max,
);
let notes = tx_candidate.notes.clone();
let account_id = tx_candidate.account.id();
let note_ids: Vec<_> = notes.iter().map(|n| n.as_note().id()).collect();
tracing::info!(
%account_id,
?note_ids,
num_notes = notes.len(),
"executing network transaction",
);
let execution_result = context.execute_transaction(tx_candidate).await;
match execution_result {
Ok((tx_id, failed, scripts_to_cache)) => {
tracing::info!(
%account_id,
%tx_id,
num_failed = failed.len(),
"network transaction executed with some failed notes",
);
self.cache_note_scripts(scripts_to_cache).await;
let all_notes_failed = failed.len() == notes.len();
if !failed.is_empty() {
let failed_notes = log_failed_notes(failed);
self.mark_notes_failed(&failed_notes, block_num).await;
}
if all_notes_failed {
ActorMode::NoViableNotes
} else {
ActorMode::WaitForBlock {
submitted_tx_id: tx_id,
submitted_at: block_num,
}
}
},
Err(err) => {
let error_msg = err.as_report();
tracing::error!(
%account_id,
?note_ids,
err = %error_msg,
"network transaction failed",
);
let failed_notes: Vec<_> = match err {
execute::NtxError::AllNotesFailed(per_note) => log_failed_notes(per_note),
other => {
let error: NoteError = Arc::new(other);
notes
.iter()
.map(|note| {
tracing::info!(
note.id = %note.as_note().id(),
nullifier = %note.as_note().nullifier(),
err = %error_msg,
"note failed: transaction execution error",
);
(note.as_note().nullifier(), error.clone())
})
.collect()
},
};
self.mark_notes_failed(&failed_notes, block_num).await;
ActorMode::NoViableNotes
},
}
}
async fn cache_note_scripts(&self, scripts: Vec<(Word, NoteScript)>) {
for (script_root, script) in scripts {
if self
.request
.send(ActorRequest::CacheNoteScript { script_root, script })
.await
.is_err()
{
break;
}
}
}
async fn mark_notes_failed(
&self,
failed_notes: &[(Nullifier, NoteError)],
block_num: BlockNumber,
) {
let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
if self
.request
.send(ActorRequest::NotesFailed {
failed_notes: failed_notes.to_vec(),
block_num,
ack_tx,
})
.await
.is_err()
{
return;
}
let _ = ack_rx.await;
}
}
fn log_failed_notes(failed: Vec<FailedNote>) -> Vec<(Nullifier, NoteError)> {
failed
.into_iter()
.map(|f| {
let error_msg = f.error().as_report();
tracing::info!(
note.id = %f.note().id(),
nullifier = %f.note().nullifier(),
err = %error_msg,
"note failed: consumability check",
);
let error: NoteError = Arc::new(std::io::Error::other(error_msg));
(f.note().nullifier(), error)
})
.collect()
}
#[cfg(test)]
mod tests {
use std::num::NonZeroU16;
use super::expiration_tx_script;
#[test]
fn expiration_script_compiles_and_encodes_delta() {
let one =
expiration_tx_script(NonZeroU16::new(1).unwrap()).expect("delta 1 should compile");
let thirty =
expiration_tx_script(NonZeroU16::new(30).unwrap()).expect("delta 30 should compile");
let max = expiration_tx_script(NonZeroU16::MAX).expect("delta u16::MAX should compile");
assert_ne!(one.root(), thirty.root(), "distinct deltas must yield distinct scripts");
assert_ne!(thirty.root(), max.root(), "distinct deltas must yield distinct scripts");
}
}