use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use actor::AccountActorContext;
use anyhow::Context;
use block_producer::BlockProducerClient;
use builder::{ChainState, MempoolEventStream};
use coordinator::Coordinator;
use db::Db;
use futures::TryStreamExt;
use miden_node_utils::lru_cache::LruCache;
use store::StoreClient;
use tokio::sync::{RwLock, mpsc};
use url::Url;
mod actor;
mod block_producer;
mod builder;
mod coordinator;
pub(crate) mod db;
mod store;
pub use builder::NetworkTransactionBuilder;
const COMPONENT: &str = "miden-ntx-builder";
const DEFAULT_MAX_NOTES_PER_TX: NonZeroUsize = NonZeroUsize::new(20).expect("literal is non-zero");
const _: () = assert!(DEFAULT_MAX_NOTES_PER_TX.get() <= miden_tx::MAX_NUM_CHECKER_NOTES);
const DEFAULT_MAX_CONCURRENT_TXS: usize = 4;
const DEFAULT_MAX_BLOCK_COUNT: usize = 4;
const DEFAULT_ACCOUNT_CHANNEL_CAPACITY: usize = 1_000;
const DEFAULT_ACTOR_CHANNEL_SIZE: usize = 100;
const DEFAULT_MAX_NOTE_ATTEMPTS: usize = 30;
const DEFAULT_SCRIPT_CACHE_SIZE: NonZeroUsize =
NonZeroUsize::new(1_000).expect("literal is non-zero");
#[derive(Debug, Clone)]
pub struct NtxBuilderConfig {
pub store_url: Url,
pub block_producer_url: Url,
pub validator_url: Url,
pub tx_prover_url: Option<Url>,
pub script_cache_size: NonZeroUsize,
pub max_concurrent_txs: usize,
pub max_notes_per_tx: NonZeroUsize,
pub max_note_attempts: usize,
pub max_block_count: usize,
pub account_channel_capacity: usize,
pub actor_channel_size: usize,
pub database_filepath: PathBuf,
}
impl NtxBuilderConfig {
pub fn new(
store_url: Url,
block_producer_url: Url,
validator_url: Url,
database_filepath: PathBuf,
) -> Self {
Self {
store_url,
block_producer_url,
validator_url,
tx_prover_url: None,
script_cache_size: DEFAULT_SCRIPT_CACHE_SIZE,
max_concurrent_txs: DEFAULT_MAX_CONCURRENT_TXS,
max_notes_per_tx: DEFAULT_MAX_NOTES_PER_TX,
max_note_attempts: DEFAULT_MAX_NOTE_ATTEMPTS,
max_block_count: DEFAULT_MAX_BLOCK_COUNT,
account_channel_capacity: DEFAULT_ACCOUNT_CHANNEL_CAPACITY,
actor_channel_size: DEFAULT_ACTOR_CHANNEL_SIZE,
database_filepath,
}
}
#[must_use]
pub fn with_tx_prover_url(mut self, url: Option<Url>) -> Self {
self.tx_prover_url = url;
self
}
#[must_use]
pub fn with_script_cache_size(mut self, size: NonZeroUsize) -> Self {
self.script_cache_size = size;
self
}
#[must_use]
pub fn with_max_concurrent_txs(mut self, max: usize) -> Self {
self.max_concurrent_txs = max;
self
}
#[must_use]
pub fn with_max_notes_per_tx(mut self, max: NonZeroUsize) -> Self {
assert!(
max.get() <= miden_tx::MAX_NUM_CHECKER_NOTES,
"max_notes_per_tx ({}) exceeds MAX_NUM_CHECKER_NOTES ({})",
max,
miden_tx::MAX_NUM_CHECKER_NOTES
);
self.max_notes_per_tx = max;
self
}
#[must_use]
pub fn with_max_note_attempts(mut self, max: usize) -> Self {
self.max_note_attempts = max;
self
}
#[must_use]
pub fn with_max_block_count(mut self, max: usize) -> Self {
self.max_block_count = max;
self
}
#[must_use]
pub fn with_account_channel_capacity(mut self, capacity: usize) -> Self {
self.account_channel_capacity = capacity;
self
}
#[must_use]
pub fn with_actor_channel_size(mut self, size: usize) -> Self {
self.actor_channel_size = size;
self
}
pub async fn build(self) -> anyhow::Result<NetworkTransactionBuilder> {
let db = Db::setup(self.database_filepath.clone()).await?;
db.purge_inflight().await.context("failed to purge inflight state")?;
let script_cache = LruCache::new(self.script_cache_size);
let coordinator =
Coordinator::new(self.max_concurrent_txs, self.actor_channel_size, db.clone());
let store = StoreClient::new(self.store_url.clone());
let block_producer = BlockProducerClient::new(self.block_producer_url.clone());
let (chain_tip_header, chain_mmr, mempool_events) = loop {
let (chain_tip_header, chain_mmr) = store
.get_latest_blockchain_data_with_retry()
.await?
.context("store should contain a latest block")?;
match block_producer
.subscribe_to_mempool_with_retry(chain_tip_header.block_num())
.await
{
Ok(subscription) => {
let stream: MempoolEventStream = Box::pin(subscription.into_stream());
break (chain_tip_header, chain_mmr, stream);
},
Err(status) if status.code() == tonic::Code::InvalidArgument => {
tracing::warn!(
err = %status,
"mempool subscription failed due to chain tip desync, retrying"
);
},
Err(err) => return Err(err).context("failed to subscribe to mempool events"),
}
};
db.upsert_chain_state(chain_tip_header.block_num(), chain_tip_header.clone())
.await
.context("failed to upsert chain state")?;
let chain_state = Arc::new(RwLock::new(ChainState::new(chain_tip_header, chain_mmr)));
let (notification_tx, notification_rx) = mpsc::channel(1);
let actor_context = AccountActorContext {
block_producer_url: self.block_producer_url.clone(),
validator_url: self.validator_url.clone(),
tx_prover_url: self.tx_prover_url.clone(),
chain_state: chain_state.clone(),
store: store.clone(),
script_cache,
max_notes_per_tx: self.max_notes_per_tx,
max_note_attempts: self.max_note_attempts,
db: db.clone(),
notification_tx,
};
Ok(NetworkTransactionBuilder::new(
self,
coordinator,
store,
db,
chain_state,
actor_context,
mempool_events,
notification_rx,
))
}
}