use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use actor::AccountActorContext;
use anyhow::Context;
use builder::MempoolEventStream;
use chain_state::ChainState;
use clients::{BlockProducerClient, StoreClient, ValidatorClient};
use coordinator::Coordinator;
use db::Db;
use futures::TryStreamExt;
use miden_node_utils::ErrorReport;
use miden_node_utils::lru_cache::LruCache;
use miden_remote_prover_client::RemoteTransactionProver;
use tokio::sync::{RwLock, mpsc};
use url::Url;
pub(crate) type NoteError = Arc<dyn ErrorReport + Send + Sync>;
mod actor;
mod builder;
mod chain_state;
mod clients;
mod coordinator;
pub(crate) mod db;
pub(crate) mod inflight_note;
pub mod server;
#[cfg(test)]
pub(crate) mod test_utils;
pub use builder::NetworkTransactionBuilder;
const COMPONENT: &str = "miden-ntx-builder";
const DEFAULT_MAX_NOTES_PER_TX: NonZeroUsize = NonZeroUsize::new(20).expect("literal is non-zero");
const _: () = assert!(DEFAULT_MAX_NOTES_PER_TX.get() <= miden_tx::MAX_NUM_CHECKER_NOTES);
const DEFAULT_MAX_CONCURRENT_TXS: usize = 4;
const DEFAULT_MAX_BLOCK_COUNT: usize = 4;
const DEFAULT_ACCOUNT_CHANNEL_CAPACITY: usize = 1_000;
const DEFAULT_MAX_NOTE_ATTEMPTS: usize = 30;
const DEFAULT_SCRIPT_CACHE_SIZE: NonZeroUsize =
NonZeroUsize::new(1_000).expect("literal is non-zero");
const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(5 * 60);
const DEFAULT_MAX_ACCOUNT_CRASHES: usize = 10;
const DEFAULT_MAX_TX_CYCLES: u32 = 1 << 19;
#[derive(Debug, Clone)]
pub struct NtxBuilderConfig {
pub store_url: Url,
pub block_producer_url: Url,
pub validator_url: Url,
pub tx_prover_url: Option<Url>,
pub script_cache_size: NonZeroUsize,
pub max_concurrent_txs: usize,
pub max_notes_per_tx: NonZeroUsize,
pub max_note_attempts: usize,
pub max_block_count: usize,
pub account_channel_capacity: usize,
pub idle_timeout: Duration,
pub max_account_crashes: usize,
pub max_cycles: u32,
pub database_filepath: PathBuf,
}
impl NtxBuilderConfig {
pub fn new(
store_url: Url,
block_producer_url: Url,
validator_url: Url,
database_filepath: PathBuf,
) -> Self {
Self {
store_url,
block_producer_url,
validator_url,
tx_prover_url: None,
script_cache_size: DEFAULT_SCRIPT_CACHE_SIZE,
max_concurrent_txs: DEFAULT_MAX_CONCURRENT_TXS,
max_notes_per_tx: DEFAULT_MAX_NOTES_PER_TX,
max_note_attempts: DEFAULT_MAX_NOTE_ATTEMPTS,
max_block_count: DEFAULT_MAX_BLOCK_COUNT,
account_channel_capacity: DEFAULT_ACCOUNT_CHANNEL_CAPACITY,
idle_timeout: DEFAULT_IDLE_TIMEOUT,
max_account_crashes: DEFAULT_MAX_ACCOUNT_CRASHES,
max_cycles: DEFAULT_MAX_TX_CYCLES,
database_filepath,
}
}
#[must_use]
pub fn with_tx_prover_url(mut self, url: Option<Url>) -> Self {
self.tx_prover_url = url;
self
}
#[must_use]
pub fn with_script_cache_size(mut self, size: NonZeroUsize) -> Self {
self.script_cache_size = size;
self
}
#[must_use]
pub fn with_max_concurrent_txs(mut self, max: usize) -> Self {
self.max_concurrent_txs = max;
self
}
#[must_use]
pub fn with_max_notes_per_tx(mut self, max: NonZeroUsize) -> Self {
assert!(
max.get() <= miden_tx::MAX_NUM_CHECKER_NOTES,
"max_notes_per_tx ({}) exceeds MAX_NUM_CHECKER_NOTES ({})",
max,
miden_tx::MAX_NUM_CHECKER_NOTES
);
self.max_notes_per_tx = max;
self
}
#[must_use]
pub fn with_max_note_attempts(mut self, max: usize) -> Self {
self.max_note_attempts = max;
self
}
#[must_use]
pub fn with_max_block_count(mut self, max: usize) -> Self {
self.max_block_count = max;
self
}
#[must_use]
pub fn with_account_channel_capacity(mut self, capacity: usize) -> Self {
self.account_channel_capacity = capacity;
self
}
#[must_use]
pub fn with_idle_timeout(mut self, timeout: Duration) -> Self {
self.idle_timeout = timeout;
self
}
#[must_use]
pub fn with_max_account_crashes(mut self, max: usize) -> Self {
self.max_account_crashes = max;
self
}
#[must_use]
pub fn with_max_cycles(mut self, max: u32) -> Self {
self.max_cycles = max;
self
}
pub async fn build(self) -> anyhow::Result<NetworkTransactionBuilder> {
let db = Db::setup(self.database_filepath.clone()).await?;
db.purge_inflight().await.context("failed to purge inflight state")?;
let script_cache = LruCache::new(self.script_cache_size);
let coordinator =
Coordinator::new(self.max_concurrent_txs, self.max_account_crashes, db.clone());
let store = StoreClient::new(self.store_url.clone());
let block_producer = BlockProducerClient::new(self.block_producer_url.clone());
let validator = ValidatorClient::new(self.validator_url.clone());
let prover = self.tx_prover_url.clone().map(RemoteTransactionProver::new);
let subscription = block_producer
.subscribe_to_mempool_with_retry()
.await
.map_err(|err| anyhow::anyhow!(err))
.context("failed to subscribe to mempool events")?;
let mempool_events: MempoolEventStream = Box::pin(subscription.into_stream());
let (chain_tip_header, chain_mmr) = store
.get_latest_blockchain_data_with_retry()
.await?
.context("store should contain a latest block")?;
db.upsert_chain_state(chain_tip_header.block_num(), chain_tip_header.clone())
.await
.context("failed to upsert chain state")?;
let chain_state = Arc::new(RwLock::new(ChainState::new(chain_tip_header, chain_mmr)));
let (request_tx, actor_request_rx) = mpsc::channel(1);
let actor_context = AccountActorContext {
block_producer: block_producer.clone(),
validator,
prover,
chain_state: chain_state.clone(),
store: store.clone(),
script_cache,
max_notes_per_tx: self.max_notes_per_tx,
max_note_attempts: self.max_note_attempts,
idle_timeout: self.idle_timeout,
db: db.clone(),
request_tx,
max_cycles: self.max_cycles,
};
Ok(NetworkTransactionBuilder::new(
self,
coordinator,
store,
db,
chain_state,
actor_context,
mempool_events,
actor_request_rx,
))
}
}