mod data_store;
mod error;
use alloc::collections::{BTreeMap, BTreeSet};
use alloc::sync::Arc;
use alloc::vec::Vec;
pub(crate) use data_store::InMemoryBatchDataStore;
pub use error::BatchBuilderError;
use miden_protocol::MIN_PROOF_SECURITY_LEVEL;
use miden_protocol::account::{Account, AccountId};
use miden_protocol::batch::ProposedBatch;
use miden_protocol::block::{BlockHeader, BlockNumber};
use miden_protocol::note::NoteId;
use miden_protocol::transaction::{PartialBlockchain, ProvenTransaction, TransactionInputs};
use miden_tx::auth::TransactionAuthenticator;
use miden_tx_batch_prover::LocalBatchProver;
use crate::store::data_store::build_partial_mmr_with_paths;
use crate::transaction::{
TransactionRequest,
TransactionResult,
TransactionStoreUpdate,
validate_executed_transaction,
};
use crate::{Client, ClientError};
pub(crate) struct PushedTx {
pub(crate) proven_tx: Arc<ProvenTransaction>,
pub(crate) transaction_inputs: TransactionInputs,
pub(crate) tx_result: TransactionResult,
}
pub struct BatchBuilder<'c, AUTH> {
pub(crate) client: &'c Client<AUTH>,
pub(crate) data_store: InMemoryBatchDataStore,
pub(crate) pushed_txs: Vec<PushedTx>,
pub(crate) consumed_input_notes: BTreeSet<NoteId>,
}
impl<AUTH> BatchBuilder<'_, AUTH> {
pub fn len(&self) -> usize {
self.pushed_txs.len()
}
pub fn is_empty(&self) -> bool {
self.pushed_txs.is_empty()
}
}
impl<AUTH> BatchBuilder<'_, AUTH>
where
AUTH: TransactionAuthenticator + Sync + 'static,
{
pub async fn submit(self) -> Result<BlockNumber, ClientError> {
let ref_block_num = self
.pushed_txs
.iter()
.map(|p| p.proven_tx.ref_block_num())
.max()
.ok_or(BatchBuilderError::Empty)?;
let lower_refs: BTreeSet<BlockNumber> = self
.pushed_txs
.iter()
.map(|p| p.proven_tx.ref_block_num())
.filter(|&r| r < ref_block_num)
.collect();
let store = self.client.store.clone();
let (ref_block_header, _) = store
.get_block_header_by_num(ref_block_num)
.await
.map_err(ClientError::StoreError)?
.ok_or_else(|| {
ClientError::StoreError(crate::store::StoreError::BlockHeaderNotFound(
ref_block_num,
))
})?;
let fetched =
store.get_block_headers(&lower_refs).await.map_err(ClientError::StoreError)?;
let authenticated_blocks: Vec<BlockHeader> =
fetched.into_iter().map(|(header, _)| header).collect();
let fetched_nums: BTreeSet<BlockNumber> =
authenticated_blocks.iter().map(BlockHeader::block_num).collect();
if let Some(&missing) = lower_refs.difference(&fetched_nums).next() {
return Err(ClientError::StoreError(crate::store::StoreError::BlockHeaderNotFound(
missing,
)));
}
let current_peaks =
store.get_current_blockchain_peaks().await.map_err(ClientError::StoreError)?;
let partial_mmr =
build_partial_mmr_with_paths(&store, current_peaks, &authenticated_blocks).await?;
let partial_blockchain = PartialBlockchain::new(partial_mmr, authenticated_blocks)?;
let len = self.pushed_txs.len();
let mut proven_txs: Vec<Arc<ProvenTransaction>> = Vec::with_capacity(len);
let mut transaction_inputs: Vec<TransactionInputs> = Vec::with_capacity(len);
let mut tx_results: Vec<TransactionResult> = Vec::with_capacity(len);
for pushed in self.pushed_txs {
proven_txs.push(pushed.proven_tx);
transaction_inputs.push(pushed.transaction_inputs);
tx_results.push(pushed.tx_result);
}
let unauthenticated_note_proofs = BTreeMap::new();
let proposed_batch = ProposedBatch::new(
proven_txs,
ref_block_header,
partial_blockchain,
unauthenticated_note_proofs,
)?;
let proven_batch =
LocalBatchProver::new(MIN_PROOF_SECURITY_LEVEL).prove(proposed_batch.clone())?;
let mut updates: Vec<TransactionStoreUpdate> = Vec::with_capacity(len);
let block_num = self
.client
.rpc_api
.submit_proven_batch(proven_batch, proposed_batch, transaction_inputs)
.await?;
for tx_result in &tx_results {
let update =
self.client.get_transaction_store_update(tx_result, block_num).await.map_err(
|source| BatchBuilderError::BatchSubmittedButUpdateBuildFailed {
block_num,
source,
},
)?;
updates.push(update);
}
if let Err(source) = self.client.store.apply_transaction_batch(updates).await {
return Err(ClientError::from(BatchBuilderError::BatchSubmittedButApplyFailed {
block_num,
source,
}));
}
Ok(block_num)
}
pub async fn push(
mut self,
account_id: AccountId,
req: TransactionRequest,
) -> Result<Self, ClientError> {
for note_id in req.input_note_ids() {
if self.consumed_input_notes.contains(¬e_id) {
return Err(ClientError::from(BatchBuilderError::DuplicateInputNote(note_id)));
}
}
let tx_result =
execute_transaction_for_batch(self.client, &mut self.data_store, account_id, req)
.await?;
let tx_inputs = tx_result.executed_transaction().tx_inputs().clone();
let proven_tx = self.client.prove_transaction(&tx_result).await?;
for note in tx_result.consumed_notes().iter() {
self.consumed_input_notes.insert(note.id());
}
self.pushed_txs.push(PushedTx {
proven_tx: Arc::new(proven_tx),
transaction_inputs: tx_inputs,
tx_result,
});
Ok(self)
}
}
async fn execute_transaction_for_batch<AUTH>(
client: &Client<AUTH>,
data_store: &mut InMemoryBatchDataStore,
account_id: AccountId,
transaction_request: TransactionRequest,
) -> Result<TransactionResult, ClientError>
where
AUTH: TransactionAuthenticator + Sync + 'static,
{
let mut account = if let Some(account) = data_store.get_account(account_id) {
account.clone()
} else {
let record = client
.store
.get_account(account_id)
.await?
.ok_or(ClientError::AccountDataNotFound(account_id))?;
if record.is_locked() {
return Err(ClientError::AccountLocked(account_id));
}
let account: Account = record.try_into()?;
account
};
let account_id = account.id();
let prep = client.prepare_transaction(&account, transaction_request).await?;
data_store.register_note_scripts(prep.output_note_scripts());
for fpi_account in &prep.foreign_account_inputs {
data_store.mast_store().load_account_code(fpi_account.code());
}
data_store.register_foreign_account_inputs(prep.foreign_account_inputs);
data_store.mast_store().load_account_code(account.code());
let mut notes = prep.notes;
if prep.ignore_invalid_notes {
notes = client
.get_valid_input_notes(&account, notes, prep.tx_args.clone(), &prep.output_recipients)
.await?;
}
let executed_transaction = client
.build_executor(data_store)?
.execute_transaction(account_id, prep.block_num, notes, prep.tx_args)
.await?;
account
.apply_delta(executed_transaction.account_delta())
.map_err(ClientError::AccountError)?;
data_store.cache_account(account_id, account);
validate_executed_transaction(&executed_transaction, &prep.output_recipients)?;
TransactionResult::new(executed_transaction, prep.future_notes)
}