use std::num::{NonZeroU16, NonZeroUsize};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use builder::BlockStream;
use chain_state::SharedChainState;
use clients::RpcClient;
use db::Db;
use miden_node_utils::ErrorReport;
use miden_node_utils::lru_cache::LruCache;
use miden_protocol::block::{BlockNumber, SignedBlock};
use miden_remote_prover_client::RemoteTransactionProver;
use tokio::sync::mpsc;
use tonic::metadata::AsciiMetadataValue;
use url::Url;
use crate::actor::{AccountActorContext, ActorConfig, GrpcClients, State};
use crate::coordinator::Coordinator;
pub(crate) type NoteError = Arc<dyn ErrorReport + Send + Sync>;
mod actor;
mod builder;
mod chain_state;
mod clients;
mod committed_block;
mod coordinator;
pub(crate) mod db;
pub mod server;
#[cfg(test)]
pub(crate) mod test_utils;
pub use builder::NetworkTransactionBuilder;
pub async fn bootstrap(database_filepath: PathBuf, genesis: &SignedBlock) -> anyhow::Result<()> {
validate_genesis_block(genesis).context("genesis block validation failed")?;
db::Db::bootstrap(database_filepath, genesis).await
}
pub fn migrate(database_filepath: impl AsRef<Path>) -> anyhow::Result<()> {
db::Db::migrate(database_filepath).context("failed to apply ntx-builder database migrations")
}
fn validate_genesis_block(block: &SignedBlock) -> anyhow::Result<()> {
anyhow::ensure!(
block.header().block_num() == BlockNumber::GENESIS,
"expected genesis block number (0), got {}",
block.header().block_num(),
);
anyhow::ensure!(
block
.signature()
.verify(block.header().commitment(), block.header().validator_key()),
"genesis block signature verification failed",
);
Ok(())
}
#[cfg(test)]
mod bootstrap_tests {
use super::*;
#[test]
fn validate_genesis_block_rejects_invalid_signature() {
let block = crate::test_utils::mock_genesis_block();
let err = validate_genesis_block(&block).expect_err("invalid signature should fail");
assert!(
err.to_string().contains("signature verification failed"),
"unexpected error: {err}",
);
}
}
const COMPONENT: &str = "miden-ntx-builder";
const DEFAULT_MAX_NOTES_PER_TX: NonZeroUsize = NonZeroUsize::new(20).expect("literal is non-zero");
const _: () = assert!(DEFAULT_MAX_NOTES_PER_TX.get() <= miden_tx::MAX_NUM_CHECKER_NOTES);
const DEFAULT_MAX_CONCURRENT_TXS: usize = 4;
const DEFAULT_MAX_BLOCK_COUNT: usize = 4;
const DEFAULT_ACCOUNT_CHANNEL_CAPACITY: usize = 1_000;
const DEFAULT_MAX_NOTE_ATTEMPTS: usize = 30;
const DEFAULT_SCRIPT_CACHE_SIZE: NonZeroUsize =
NonZeroUsize::new(1_000).expect("literal is non-zero");
const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(5 * 60);
const DEFAULT_MAX_ACCOUNT_CRASHES: usize = 10;
const DEFAULT_REQUEST_BACKOFF_INITIAL: Duration = Duration::from_millis(100);
const DEFAULT_REQUEST_BACKOFF_MAX: Duration = Duration::from_secs(30);
const DEFAULT_MAX_TX_CYCLES: u32 = 1 << 19;
const DEFAULT_TX_EXPIRATION_DELTA: NonZeroU16 = NonZeroU16::new(30).unwrap();
#[derive(Debug, Clone)]
pub struct NtxBuilderConfig {
pub rpc_url: Url,
pub rpc_auth_header: Option<AsciiMetadataValue>,
pub tx_prover_url: Url,
pub script_cache_size: NonZeroUsize,
pub max_concurrent_txs: usize,
pub max_notes_per_tx: NonZeroUsize,
pub max_note_attempts: usize,
pub max_block_count: usize,
pub account_channel_capacity: usize,
pub idle_timeout: Duration,
pub max_account_crashes: usize,
pub max_cycles: u32,
pub tx_expiration_delta: NonZeroU16,
pub request_backoff_initial: Duration,
pub request_backoff_max: Duration,
pub database_filepath: PathBuf,
pub sqlite_connection_pool_size: NonZeroUsize,
}
impl NtxBuilderConfig {
pub fn new(rpc_url: Url, tx_prover_url: Url, database_filepath: PathBuf) -> Self {
Self {
rpc_url,
rpc_auth_header: None,
tx_prover_url,
script_cache_size: DEFAULT_SCRIPT_CACHE_SIZE,
max_concurrent_txs: DEFAULT_MAX_CONCURRENT_TXS,
max_notes_per_tx: DEFAULT_MAX_NOTES_PER_TX,
max_note_attempts: DEFAULT_MAX_NOTE_ATTEMPTS,
max_block_count: DEFAULT_MAX_BLOCK_COUNT,
account_channel_capacity: DEFAULT_ACCOUNT_CHANNEL_CAPACITY,
idle_timeout: DEFAULT_IDLE_TIMEOUT,
max_account_crashes: DEFAULT_MAX_ACCOUNT_CRASHES,
max_cycles: DEFAULT_MAX_TX_CYCLES,
tx_expiration_delta: DEFAULT_TX_EXPIRATION_DELTA,
request_backoff_initial: DEFAULT_REQUEST_BACKOFF_INITIAL,
request_backoff_max: DEFAULT_REQUEST_BACKOFF_MAX,
database_filepath,
sqlite_connection_pool_size: miden_node_db::default_connection_pool_size(),
}
}
#[must_use]
pub fn with_rpc_auth_header(mut self, value: AsciiMetadataValue) -> Self {
self.rpc_auth_header = Some(value);
self
}
#[must_use]
pub fn with_script_cache_size(mut self, size: NonZeroUsize) -> Self {
self.script_cache_size = size;
self
}
#[must_use]
pub fn with_max_concurrent_txs(mut self, max: usize) -> Self {
self.max_concurrent_txs = max;
self
}
#[must_use]
pub fn with_max_notes_per_tx(mut self, max: NonZeroUsize) -> Self {
assert!(
max.get() <= miden_tx::MAX_NUM_CHECKER_NOTES,
"max_notes_per_tx ({}) exceeds MAX_NUM_CHECKER_NOTES ({})",
max,
miden_tx::MAX_NUM_CHECKER_NOTES
);
self.max_notes_per_tx = max;
self
}
#[must_use]
pub fn with_max_note_attempts(mut self, max: usize) -> Self {
self.max_note_attempts = max;
self
}
#[must_use]
pub fn with_max_block_count(mut self, max: usize) -> Self {
self.max_block_count = max;
self
}
#[must_use]
pub fn with_account_channel_capacity(mut self, capacity: usize) -> Self {
self.account_channel_capacity = capacity;
self
}
#[must_use]
pub fn with_idle_timeout(mut self, timeout: Duration) -> Self {
self.idle_timeout = timeout;
self
}
#[must_use]
pub fn with_max_account_crashes(mut self, max: usize) -> Self {
self.max_account_crashes = max;
self
}
#[must_use]
pub fn with_max_cycles(mut self, max: u32) -> Self {
self.max_cycles = max;
self
}
#[must_use]
pub fn with_tx_expiration_delta(mut self, delta: NonZeroU16) -> Self {
self.tx_expiration_delta = delta;
self
}
#[must_use]
pub fn with_request_backoff(mut self, initial: Duration, max: Duration) -> Self {
self.request_backoff_initial = initial;
self.request_backoff_max = max;
self
}
#[must_use]
pub fn with_sqlite_connection_pool_size(mut self, size: NonZeroUsize) -> Self {
self.sqlite_connection_pool_size = size;
self
}
pub async fn build(self) -> anyhow::Result<NetworkTransactionBuilder> {
anyhow::ensure!(
self.sqlite_connection_pool_size.get() >= 2,
"sqlite connection pool size must be at least 2 (the event loop pins one connection)",
);
let db = Db::load_with_pool_size(
self.database_filepath.clone(),
self.sqlite_connection_pool_size,
)
.await?;
let genesis_commitment = db.get_genesis_commitment().await.context(
"failed to read genesis commitment; \
run `miden-ntx-builder bootstrap` first",
)?;
let rpc = match self.rpc_auth_header.clone() {
Some(rpc_auth_header_value) => RpcClient::new_with_auth(
self.rpc_url.clone(),
Some(rpc_auth_header_value),
genesis_commitment,
self.request_backoff_initial,
self.request_backoff_max,
),
None => RpcClient::new(
self.rpc_url.clone(),
genesis_commitment,
self.request_backoff_initial,
self.request_backoff_max,
),
}?;
let (last_applied_block, header, mmr) =
db.get_chain_state().await.context("failed to read chain state")?.context(
"ntx-builder database has not been bootstrapped; \
run `miden-ntx-builder bootstrap` first",
)?;
let block_from = last_applied_block.child();
tracing::info!(
%block_from,
"ntx-builder opening committed-block subscription"
);
let raw_stream = rpc
.block_subscription_with_retry(block_from)
.await
.map_err(|err| anyhow::anyhow!(err))
.context("failed to subscribe to committed blocks")?;
let block_stream: BlockStream = Box::pin(raw_stream);
let chain = Arc::new(SharedChainState::new(header, mmr));
let (coordinator, actor_request_rx) =
self.build_coordinator(rpc, db.clone(), chain.clone())?;
Ok(NetworkTransactionBuilder::new(
self,
db,
block_stream,
last_applied_block,
chain,
coordinator,
actor_request_rx,
))
}
fn build_coordinator(
&self,
rpc: RpcClient,
db: Db,
chain: Arc<SharedChainState>,
) -> anyhow::Result<(Coordinator, mpsc::Receiver<actor::ActorRequest>)> {
let (request_tx, actor_request_rx) = mpsc::channel(self.account_channel_capacity);
let actor_context = AccountActorContext {
clients: GrpcClients {
rpc,
prover: RemoteTransactionProver::new(self.tx_prover_url.as_str()),
},
state: State {
db,
chain,
script_cache: LruCache::new(self.script_cache_size),
expiration_script: actor::expiration_tx_script(self.tx_expiration_delta)
.context("failed to compile network-tx expiration script")?,
},
config: ActorConfig {
max_notes_per_tx: self.max_notes_per_tx,
max_note_attempts: self.max_note_attempts,
idle_timeout: self.idle_timeout,
max_cycles: self.max_cycles,
tx_expiration_delta: self.tx_expiration_delta,
request_backoff_initial: self.request_backoff_initial,
request_backoff_max: self.request_backoff_max,
},
request_tx,
};
let coordinator =
Coordinator::new(self.max_concurrent_txs, self.max_account_crashes, actor_context);
Ok((coordinator, actor_request_rx))
}
}