pub mod candidate;
mod execute;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use candidate::TransactionCandidate;
use futures::FutureExt;
use miden_node_proto::domain::account::NetworkAccountId;
use miden_node_utils::ErrorReport;
use miden_node_utils::lru_cache::LruCache;
use miden_protocol::Word;
use miden_protocol::account::{Account, AccountDelta};
use miden_protocol::block::BlockNumber;
use miden_protocol::note::{NoteScript, Nullifier};
use miden_protocol::transaction::TransactionId;
use miden_remote_prover_client::RemoteTransactionProver;
use miden_tx::FailedNote;
use tokio::sync::{Notify, RwLock, Semaphore, mpsc};
use tokio_util::sync::CancellationToken;
use crate::NoteError;
use crate::chain_state::ChainState;
use crate::clients::{BlockProducerClient, StoreClient, ValidatorClient};
use crate::db::Db;
pub enum ActorRequest {
NotesFailed {
failed_notes: Vec<(Nullifier, NoteError)>,
block_num: BlockNumber,
ack_tx: tokio::sync::oneshot::Sender<()>,
},
CacheNoteScript { script_root: Word, script: NoteScript },
}
#[derive(Clone)]
pub struct AccountActorContext {
pub store: StoreClient,
pub block_producer: BlockProducerClient,
pub validator: ValidatorClient,
pub prover: Option<RemoteTransactionProver>,
pub chain_state: Arc<RwLock<ChainState>>,
pub script_cache: LruCache<Word, NoteScript>,
pub max_notes_per_tx: NonZeroUsize,
pub max_note_attempts: usize,
pub idle_timeout: Duration,
pub db: Db,
pub request_tx: mpsc::Sender<ActorRequest>,
pub max_cycles: u32,
}
#[cfg(test)]
impl AccountActorContext {
pub fn test(db: &crate::db::Db) -> Self {
use miden_protocol::crypto::merkle::mmr::{Forest, MmrPeaks, PartialMmr};
use tokio::sync::RwLock;
use url::Url;
use crate::chain_state::ChainState;
use crate::clients::StoreClient;
use crate::test_utils::mock_block_header;
let url = Url::parse("http://127.0.0.1:1").unwrap();
let block_header = mock_block_header(0_u32.into());
let chain_mmr = PartialMmr::from_peaks(MmrPeaks::new(Forest::new(0), vec![]).unwrap());
let chain_state = Arc::new(RwLock::new(ChainState::new(block_header, chain_mmr)));
let (request_tx, _request_rx) = mpsc::channel(1);
Self {
block_producer: BlockProducerClient::new(url.clone()),
validator: ValidatorClient::new(url.clone()),
prover: None,
chain_state,
store: StoreClient::new(url),
script_cache: LruCache::new(NonZeroUsize::new(1).unwrap()),
max_notes_per_tx: NonZeroUsize::new(1).unwrap(),
max_note_attempts: 1,
idle_timeout: Duration::from_secs(60),
db: db.clone(),
request_tx,
max_cycles: 1 << 18,
}
}
}
#[derive(Debug)]
pub enum AccountOrigin {
Transaction(Box<Account>),
Store(NetworkAccountId),
}
impl AccountOrigin {
pub fn transaction(delta: &AccountDelta) -> Option<Self> {
let account = Account::try_from(delta).ok()?;
if account.is_network() {
Some(AccountOrigin::Transaction(account.clone().into()))
} else {
None
}
}
pub fn store(account_id: NetworkAccountId) -> Self {
AccountOrigin::Store(account_id)
}
pub fn id(&self) -> NetworkAccountId {
match self {
AccountOrigin::Transaction(account) => NetworkAccountId::try_from(account.id())
.expect("actor accounts are always network accounts"),
AccountOrigin::Store(account_id) => *account_id,
}
}
}
#[derive(Debug)]
enum ActorMode {
NoViableNotes,
NotesAvailable,
TransactionInflight(TransactionId),
}
pub struct AccountActor {
origin: AccountOrigin,
store: StoreClient,
db: Db,
mode: ActorMode,
notify: Arc<Notify>,
cancel_token: CancellationToken,
block_producer: BlockProducerClient,
validator: ValidatorClient,
prover: Option<RemoteTransactionProver>,
chain_state: Arc<RwLock<ChainState>>,
script_cache: LruCache<Word, NoteScript>,
max_notes_per_tx: NonZeroUsize,
max_note_attempts: usize,
idle_timeout: Duration,
request_tx: mpsc::Sender<ActorRequest>,
max_cycles: u32,
}
impl AccountActor {
pub fn new(
origin: AccountOrigin,
actor_context: &AccountActorContext,
notify: Arc<Notify>,
cancel_token: CancellationToken,
) -> Self {
Self {
origin,
store: actor_context.store.clone(),
db: actor_context.db.clone(),
mode: ActorMode::NoViableNotes,
notify,
cancel_token,
block_producer: actor_context.block_producer.clone(),
validator: actor_context.validator.clone(),
prover: actor_context.prover.clone(),
chain_state: actor_context.chain_state.clone(),
script_cache: actor_context.script_cache.clone(),
max_notes_per_tx: actor_context.max_notes_per_tx,
max_note_attempts: actor_context.max_note_attempts,
idle_timeout: actor_context.idle_timeout,
request_tx: actor_context.request_tx.clone(),
max_cycles: actor_context.max_cycles,
}
}
pub async fn run(mut self, semaphore: Arc<Semaphore>) -> anyhow::Result<()> {
let account_id = self.origin.id();
let block_num = self.chain_state.read().await.chain_tip_header.block_num();
let has_notes = self
.db
.has_available_notes(account_id, block_num, self.max_note_attempts)
.await
.context("failed to check for available notes")?;
if has_notes {
self.mode = ActorMode::NotesAvailable;
}
loop {
let tx_permit_acquisition = match self.mode {
ActorMode::NoViableNotes | ActorMode::TransactionInflight(_) => {
std::future::pending().boxed()
},
ActorMode::NotesAvailable => semaphore.acquire().boxed(),
};
let idle_timeout_sleep = match self.mode {
ActorMode::NoViableNotes => tokio::time::sleep(self.idle_timeout).boxed(),
_ => std::future::pending().boxed(),
};
tokio::select! {
_ = self.cancel_token.cancelled() => {
return Ok(());
}
_ = self.notify.notified() => {
match self.mode {
ActorMode::TransactionInflight(awaited_id) => {
let exists = self
.db
.transaction_exists(awaited_id)
.await
.context("failed to check transaction status")?;
if exists {
self.mode = ActorMode::NotesAvailable;
}
},
_ => {
self.mode = ActorMode::NotesAvailable;
}
}
},
permit = tx_permit_acquisition => {
let _permit = permit.context("semaphore closed")?;
let chain_state = self.chain_state.read().await.clone();
let tx_candidate = self.select_candidate_from_db(
account_id,
chain_state,
).await?;
if let Some(tx_candidate) = tx_candidate {
self.execute_transactions(account_id, tx_candidate).await;
} else {
self.mode = ActorMode::NoViableNotes;
}
}
_ = idle_timeout_sleep => {
tracing::info!(%account_id, "Account actor deactivated due to idle timeout");
return Ok(());
}
}
}
}
async fn select_candidate_from_db(
&self,
account_id: NetworkAccountId,
chain_state: ChainState,
) -> anyhow::Result<Option<TransactionCandidate>> {
let block_num = chain_state.chain_tip_header.block_num();
let max_notes = self.max_notes_per_tx.get();
let (latest_account, notes) = self
.db
.select_candidate(account_id, block_num, self.max_note_attempts)
.await
.context("failed to query DB for transaction candidate")?;
let Some(account) = latest_account else {
tracing::info!(account_id = %account_id, "Account no longer exists in DB");
return Ok(None);
};
let notes: Vec<_> = notes.into_iter().take(max_notes).collect();
if notes.is_empty() {
return Ok(None);
}
let (chain_tip_header, chain_mmr) = chain_state.into_parts();
Ok(Some(TransactionCandidate {
account,
notes,
chain_tip_header,
chain_mmr,
}))
}
#[tracing::instrument(name = "ntx.actor.execute_transactions", skip(self, tx_candidate))]
async fn execute_transactions(
&mut self,
account_id: NetworkAccountId,
tx_candidate: TransactionCandidate,
) {
let block_num = tx_candidate.chain_tip_header.block_num();
let context = execute::NtxContext::new(
self.block_producer.clone(),
self.validator.clone(),
self.prover.clone(),
self.store.clone(),
self.script_cache.clone(),
self.db.clone(),
self.max_cycles,
);
let notes = tx_candidate.notes.clone();
let account_id = tx_candidate.account.id();
let note_ids: Vec<_> = notes.iter().map(|n| n.to_inner().as_note().id()).collect();
tracing::info!(
%account_id,
?note_ids,
num_notes = notes.len(),
"executing network transaction",
);
let execution_result = context.execute_transaction(tx_candidate).await;
match execution_result {
Ok((tx_id, failed, scripts_to_cache)) => {
tracing::info!(
%account_id,
%tx_id,
num_failed = failed.len(),
"network transaction executed with some failed notes",
);
self.cache_note_scripts(scripts_to_cache).await;
if !failed.is_empty() {
let failed_notes = log_failed_notes(failed);
self.mark_notes_failed(&failed_notes, block_num).await;
}
self.mode = ActorMode::TransactionInflight(tx_id);
},
Err(err) => {
let error_msg = err.as_report();
tracing::error!(
%account_id,
?note_ids,
err = %error_msg,
"network transaction failed",
);
self.mode = ActorMode::NoViableNotes;
let failed_notes: Vec<_> = match err {
execute::NtxError::AllNotesFailed(per_note) => log_failed_notes(per_note),
other => {
let error: NoteError = Arc::new(other);
notes
.iter()
.map(|note| {
tracing::info!(
note.id = %note.to_inner().as_note().id(),
nullifier = %note.nullifier(),
err = %error_msg,
"note failed: transaction execution error",
);
(note.nullifier(), error.clone())
})
.collect()
},
};
self.mark_notes_failed(&failed_notes, block_num).await;
},
}
}
async fn cache_note_scripts(&self, scripts: Vec<(Word, NoteScript)>) {
for (script_root, script) in scripts {
if self
.request_tx
.send(ActorRequest::CacheNoteScript { script_root, script })
.await
.is_err()
{
break;
}
}
}
async fn mark_notes_failed(
&self,
failed_notes: &[(Nullifier, NoteError)],
block_num: BlockNumber,
) {
let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
if self
.request_tx
.send(ActorRequest::NotesFailed {
failed_notes: failed_notes.to_vec(),
block_num,
ack_tx,
})
.await
.is_err()
{
return;
}
let _ = ack_rx.await;
}
}
fn log_failed_notes(failed: Vec<FailedNote>) -> Vec<(Nullifier, NoteError)> {
failed
.into_iter()
.map(|f| {
let error_msg = f.error.as_report();
tracing::info!(
note.id = %f.note.id(),
nullifier = %f.note.nullifier(),
err = %error_msg,
"note failed: consumability check",
);
(f.note.nullifier(), Arc::new(f.error) as NoteError)
})
.collect()
}