miden-node-ntx-builder 0.14.4

Miden node's network transaction builder component
Documentation
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use actor::AccountActorContext;
use anyhow::Context;
use builder::MempoolEventStream;
use chain_state::ChainState;
use clients::{BlockProducerClient, StoreClient, ValidatorClient};
use coordinator::Coordinator;
use db::Db;
use futures::TryStreamExt;
use miden_node_utils::ErrorReport;
use miden_node_utils::lru_cache::LruCache;
use miden_remote_prover_client::RemoteTransactionProver;
use tokio::sync::{RwLock, mpsc};
use url::Url;

pub(crate) type NoteError = Arc<dyn ErrorReport + Send + Sync>;

mod actor;
mod builder;
mod chain_state;
mod clients;
mod coordinator;
pub(crate) mod db;
pub(crate) mod inflight_note;
pub mod server;

#[cfg(test)]
pub(crate) mod test_utils;

pub use builder::NetworkTransactionBuilder;

// CONSTANTS
// =================================================================================================

const COMPONENT: &str = "miden-ntx-builder";

/// Default maximum number of network notes a network transaction is allowed to consume.
const DEFAULT_MAX_NOTES_PER_TX: NonZeroUsize = NonZeroUsize::new(20).expect("literal is non-zero");
const _: () = assert!(DEFAULT_MAX_NOTES_PER_TX.get() <= miden_tx::MAX_NUM_CHECKER_NOTES);

/// Default maximum number of network transactions which should be in progress concurrently.
///
/// This only counts transactions which are being computed locally and does not include
/// uncommitted transactions in the mempool.
const DEFAULT_MAX_CONCURRENT_TXS: usize = 4;

/// Default maximum number of blocks to keep in the chain MMR.
const DEFAULT_MAX_BLOCK_COUNT: usize = 4;

/// Default channel capacity for account loading from the store.
const DEFAULT_ACCOUNT_CHANNEL_CAPACITY: usize = 1_000;

/// Default maximum number of attempts to execute a failing note before dropping it.
const DEFAULT_MAX_NOTE_ATTEMPTS: usize = 30;

/// Default script cache size.
const DEFAULT_SCRIPT_CACHE_SIZE: NonZeroUsize =
    NonZeroUsize::new(1_000).expect("literal is non-zero");

/// Default duration after which an idle network account actor will deactivate.
const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(5 * 60);

/// Default maximum number of crashes an account actor is allowed before being deactivated.
const DEFAULT_MAX_ACCOUNT_CRASHES: usize = 10;

/// Default maximum number of VM execution cycles allowed for a network transaction.
///
/// This limits the computational cost of network transactions. The protocol maximum is
/// `1 << 29` but network transactions should be much cheaper.
const DEFAULT_MAX_TX_CYCLES: u32 = 1 << 19;

// CONFIGURATION
// =================================================================================================

/// Configuration for the Network Transaction Builder.
///
/// This struct contains all the settings needed to create and run a `NetworkTransactionBuilder`.
#[derive(Debug, Clone)]
pub struct NtxBuilderConfig {
    /// Address of the store gRPC server (ntx-builder API).
    pub store_url: Url,

    /// Address of the block producer gRPC server.
    pub block_producer_url: Url,

    /// Address of the validator gRPC server.
    pub validator_url: Url,

    /// Address of the remote transaction prover. If `None`, transactions will be proven locally.
    pub tx_prover_url: Option<Url>,

    /// Size of the LRU cache for note scripts. Scripts are fetched from the store and cached
    /// to avoid repeated gRPC calls.
    pub script_cache_size: NonZeroUsize,

    /// Maximum number of network transactions which should be in progress concurrently across
    /// all account actors.
    pub max_concurrent_txs: usize,

    /// Maximum number of network notes a single transaction is allowed to consume.
    pub max_notes_per_tx: NonZeroUsize,

    /// Maximum number of attempts to execute a failing note before dropping it.
    /// Notes use exponential backoff between attempts.
    pub max_note_attempts: usize,

    /// Maximum number of blocks to keep in the chain MMR. Older blocks are pruned.
    pub max_block_count: usize,

    /// Channel capacity for loading accounts from the store during startup.
    pub account_channel_capacity: usize,

    /// Duration after which an idle network account will deactivate.
    ///
    /// An account is considered idle once it has no viable notes to consume.
    /// A deactivated account will reactivate if targeted with new notes.
    pub idle_timeout: Duration,

    /// Maximum number of crashes before an account deactivated.
    ///
    /// Once this limit is reached, no new transactions will be created for this account.
    pub max_account_crashes: usize,

    /// Maximum number of VM execution cycles allowed for a single network transaction.
    ///
    /// Network transactions that exceed this limit will fail with an execution error.
    /// Defaults to 2^18 cycles.
    pub max_cycles: u32,

    /// Path to the SQLite database file used for persistent state.
    pub database_filepath: PathBuf,
}

impl NtxBuilderConfig {
    pub fn new(
        store_url: Url,
        block_producer_url: Url,
        validator_url: Url,
        database_filepath: PathBuf,
    ) -> Self {
        Self {
            store_url,
            block_producer_url,
            validator_url,
            tx_prover_url: None,
            script_cache_size: DEFAULT_SCRIPT_CACHE_SIZE,
            max_concurrent_txs: DEFAULT_MAX_CONCURRENT_TXS,
            max_notes_per_tx: DEFAULT_MAX_NOTES_PER_TX,
            max_note_attempts: DEFAULT_MAX_NOTE_ATTEMPTS,
            max_block_count: DEFAULT_MAX_BLOCK_COUNT,
            account_channel_capacity: DEFAULT_ACCOUNT_CHANNEL_CAPACITY,
            idle_timeout: DEFAULT_IDLE_TIMEOUT,
            max_account_crashes: DEFAULT_MAX_ACCOUNT_CRASHES,
            max_cycles: DEFAULT_MAX_TX_CYCLES,
            database_filepath,
        }
    }

    /// Sets the remote transaction prover URL.
    ///
    /// If not set, transactions will be proven locally.
    #[must_use]
    pub fn with_tx_prover_url(mut self, url: Option<Url>) -> Self {
        self.tx_prover_url = url;
        self
    }

    /// Sets the script cache size.
    #[must_use]
    pub fn with_script_cache_size(mut self, size: NonZeroUsize) -> Self {
        self.script_cache_size = size;
        self
    }

    /// Sets the maximum number of concurrent transactions.
    #[must_use]
    pub fn with_max_concurrent_txs(mut self, max: usize) -> Self {
        self.max_concurrent_txs = max;
        self
    }

    /// Sets the maximum number of notes per transaction.
    ///
    /// # Panics
    ///
    /// Panics if `max` exceeds `miden_tx::MAX_NUM_CHECKER_NOTES`.
    #[must_use]
    pub fn with_max_notes_per_tx(mut self, max: NonZeroUsize) -> Self {
        assert!(
            max.get() <= miden_tx::MAX_NUM_CHECKER_NOTES,
            "max_notes_per_tx ({}) exceeds MAX_NUM_CHECKER_NOTES ({})",
            max,
            miden_tx::MAX_NUM_CHECKER_NOTES
        );
        self.max_notes_per_tx = max;
        self
    }

    /// Sets the maximum number of note execution attempts.
    #[must_use]
    pub fn with_max_note_attempts(mut self, max: usize) -> Self {
        self.max_note_attempts = max;
        self
    }

    /// Sets the maximum number of blocks to keep in the chain MMR.
    #[must_use]
    pub fn with_max_block_count(mut self, max: usize) -> Self {
        self.max_block_count = max;
        self
    }

    /// Sets the account channel capacity for startup loading.
    #[must_use]
    pub fn with_account_channel_capacity(mut self, capacity: usize) -> Self {
        self.account_channel_capacity = capacity;
        self
    }

    /// Sets the idle timeout for actors.
    ///
    /// Actors that remain idle (no viable notes) for this duration will be deactivated.
    #[must_use]
    pub fn with_idle_timeout(mut self, timeout: Duration) -> Self {
        self.idle_timeout = timeout;
        self
    }

    /// Sets the maximum number of crashes before an account actor is deactivated.
    #[must_use]
    pub fn with_max_account_crashes(mut self, max: usize) -> Self {
        self.max_account_crashes = max;
        self
    }

    /// Sets the maximum number of VM execution cycles for network transactions.
    #[must_use]
    pub fn with_max_cycles(mut self, max: u32) -> Self {
        self.max_cycles = max;
        self
    }

    /// Builds and initializes the network transaction builder.
    ///
    /// This method connects to the store and block producer services, fetches the current
    /// chain tip, and subscribes to mempool events.
    ///
    /// # Errors
    ///
    /// Returns an error if:
    /// - The store connection fails
    /// - The mempool subscription fails (after retries)
    /// - The store contains no blocks (not bootstrapped)
    pub async fn build(self) -> anyhow::Result<NetworkTransactionBuilder> {
        // Set up the database (bootstrap + connection pool).
        let db = Db::setup(self.database_filepath.clone()).await?;

        // Purge inflight state from previous run.
        db.purge_inflight().await.context("failed to purge inflight state")?;

        let script_cache = LruCache::new(self.script_cache_size);
        let coordinator =
            Coordinator::new(self.max_concurrent_txs, self.max_account_crashes, db.clone());

        let store = StoreClient::new(self.store_url.clone());
        let block_producer = BlockProducerClient::new(self.block_producer_url.clone());
        let validator = ValidatorClient::new(self.validator_url.clone());
        let prover = self.tx_prover_url.clone().map(RemoteTransactionProver::new);

        // Subscribe to mempool first to ensure we don't miss any events. The subscription
        // replays all inflight transactions, so the subscriber's state is fully reconstructed.
        let subscription = block_producer
            .subscribe_to_mempool_with_retry()
            .await
            .map_err(|err| anyhow::anyhow!(err))
            .context("failed to subscribe to mempool events")?;
        let mempool_events: MempoolEventStream = Box::pin(subscription.into_stream());

        let (chain_tip_header, chain_mmr) = store
            .get_latest_blockchain_data_with_retry()
            .await?
            .context("store should contain a latest block")?;

        // Store the chain tip in the DB.
        db.upsert_chain_state(chain_tip_header.block_num(), chain_tip_header.clone())
            .await
            .context("failed to upsert chain state")?;

        let chain_state = Arc::new(RwLock::new(ChainState::new(chain_tip_header, chain_mmr)));

        let (request_tx, actor_request_rx) = mpsc::channel(1);

        let actor_context = AccountActorContext {
            block_producer: block_producer.clone(),
            validator,
            prover,
            chain_state: chain_state.clone(),
            store: store.clone(),
            script_cache,
            max_notes_per_tx: self.max_notes_per_tx,
            max_note_attempts: self.max_note_attempts,
            idle_timeout: self.idle_timeout,
            db: db.clone(),
            request_tx,
            max_cycles: self.max_cycles,
        };

        Ok(NetworkTransactionBuilder::new(
            self,
            coordinator,
            store,
            db,
            chain_state,
            actor_context,
            mempool_events,
            actor_request_rx,
        ))
    }
}