miden-node-ntx-builder 0.14.4

Miden node's network transaction builder component
Documentation
use std::pin::Pin;
use std::sync::Arc;

use anyhow::Context;
use futures::Stream;
use miden_node_proto::domain::account::NetworkAccountId;
use miden_node_proto::domain::mempool::MempoolEvent;
use miden_protocol::account::delta::AccountUpdateDetails;
use miden_protocol::block::BlockHeader;
use tokio::net::TcpListener;
use tokio::sync::{RwLock, mpsc};
use tokio::task::JoinSet;
use tokio_stream::StreamExt;
use tonic::Status;

use crate::NtxBuilderConfig;
use crate::actor::{AccountActorContext, AccountOrigin, ActorRequest};
use crate::chain_state::ChainState;
use crate::clients::StoreClient;
use crate::coordinator::Coordinator;
use crate::db::Db;
use crate::server::NtxBuilderRpcServer;

// NETWORK TRANSACTION BUILDER
// ================================================================================================

/// A boxed, pinned stream of mempool events with a `'static` lifetime.
///
/// Boxing gives the stream a `'static` lifetime by ensuring it owns all its data, avoiding
/// complex lifetime annotations that would otherwise be required when storing `impl TryStream`.
pub(crate) type MempoolEventStream =
    Pin<Box<dyn Stream<Item = Result<MempoolEvent, Status>> + Send>>;

/// Network transaction builder component.
///
/// The network transaction builder is in charge of building transactions that consume notes
/// against network accounts. These notes are identified and communicated by the block producer.
/// The service maintains a list of unconsumed notes and periodically executes and proves
/// transactions that consume them (reaching out to the store to retrieve state as necessary).
///
/// The builder manages the tasks for every network account on the chain through the coordinator.
///
/// Create an instance using [`NtxBuilderConfig::build()`].
pub struct NetworkTransactionBuilder {
    /// Configuration for the builder.
    config: NtxBuilderConfig,
    /// Coordinator for managing actor tasks.
    coordinator: Coordinator,
    /// Client for the store gRPC API.
    store: StoreClient,
    /// Database for persistent state.
    db: Db,
    /// Shared chain state updated by the event loop and read by actors.
    chain_state: Arc<RwLock<ChainState>>,
    /// Context shared with all account actors.
    actor_context: AccountActorContext,
    /// Stream of mempool events from the block producer.
    mempool_events: MempoolEventStream,
    /// Database update requests from account actors.
    ///
    /// We keep database writes centralized so this is how actors communicate
    /// items to write.
    actor_request_rx: mpsc::Receiver<ActorRequest>,
}

impl NetworkTransactionBuilder {
    #[expect(clippy::too_many_arguments)]
    pub(crate) fn new(
        config: NtxBuilderConfig,
        coordinator: Coordinator,
        store: StoreClient,
        db: Db,
        chain_state: Arc<RwLock<ChainState>>,
        actor_context: AccountActorContext,
        mempool_events: MempoolEventStream,
        actor_request_rx: mpsc::Receiver<ActorRequest>,
    ) -> Self {
        Self {
            config,
            coordinator,
            store,
            db,
            chain_state,
            actor_context,
            mempool_events,
            actor_request_rx,
        }
    }

    /// Runs the network transaction builder event loop until a fatal error occurs.
    ///
    /// If a `TcpListener` is provided, a gRPC server is also spawned to expose the
    /// `GetNoteError` endpoint.
    ///
    /// This method:
    /// 1. Optionally starts a gRPC server for note error queries
    /// 2. Spawns a background task to load existing network accounts from the store
    /// 3. Runs the main event loop, processing mempool events and managing actors
    ///
    /// # Errors
    ///
    /// Returns an error if:
    /// - The mempool event stream ends unexpectedly
    /// - An actor encounters a fatal error
    /// - The account loader task fails
    /// - The gRPC server fails
    pub async fn run(self, listener: Option<TcpListener>) -> anyhow::Result<()> {
        let mut join_set = JoinSet::new();

        // Start the gRPC server if a listener is provided.
        if let Some(listener) = listener {
            let server = NtxBuilderRpcServer::new(self.db.clone());
            join_set.spawn(async move {
                server.serve(listener).await.context("ntx-builder gRPC server failed")
            });
        }

        join_set.spawn(self.run_event_loop());

        // Wait for either the event loop or the gRPC server to complete.
        // Any completion is treated as fatal.
        if let Some(result) = join_set.join_next().await {
            result.context("ntx-builder task panicked")??;
        }

        Ok(())
    }

    /// Runs the main event loop.
    async fn run_event_loop(mut self) -> anyhow::Result<()> {
        // Spawn a background task to load network accounts from the store.
        // Accounts are sent through a channel and processed in the main event loop.
        let (account_tx, mut account_rx) =
            mpsc::channel::<NetworkAccountId>(self.config.account_channel_capacity);
        let account_loader_store = self.store.clone();
        let mut account_loader_handle = tokio::spawn(async move {
            account_loader_store
                .stream_network_account_ids(account_tx)
                .await
                .context("failed to load network accounts from store")
        });

        // Main event loop.
        loop {
            tokio::select! {
                // Handle actor result. If a timed-out actor needs respawning, do so.
                result = self.coordinator.next() => {
                    if let Some(account_id) = result? {
                        self.coordinator
                            .spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
                    }
                },
                // Handle mempool events.
                event = self.mempool_events.next() => {
                    let event = event
                        .context("mempool event stream ended")?
                        .context("mempool event stream failed")?;

                    self.handle_mempool_event(event).await?;
                },
                // Handle account batches loaded from the store.
                // Once all accounts are loaded, the channel closes and this branch
                // becomes inactive (recv returns None and we stop matching).
                Some(account_id) = account_rx.recv() => {
                    self.handle_loaded_account(account_id).await?;
                },
                // Handle requests from actors.
                Some(request) = self.actor_request_rx.recv() => {
                    self.handle_actor_request(request).await?;
                },
                // Handle account loader task completion/failure.
                // If the task fails, we abort since the builder would be in a degraded state
                // where existing notes against network accounts won't be processed.
                result = &mut account_loader_handle => {
                    result
                        .context("account loader task panicked")
                        .flatten()?;

                    tracing::info!("account loading from store completed");
                    account_loader_handle = tokio::spawn(std::future::pending());
                },
            }
        }
    }

    /// Handles account IDs loaded from the store by syncing state to DB and spawning actors.
    #[tracing::instrument(name = "ntx.builder.handle_loaded_account", skip(self, account_id))]
    async fn handle_loaded_account(
        &mut self,
        account_id: NetworkAccountId,
    ) -> Result<(), anyhow::Error> {
        // Fetch account from store and write to DB.
        let account = self
            .store
            .get_network_account(account_id)
            .await
            .context("failed to load account from store")?
            .context("account should exist in store")?;

        let block_num = self.chain_state.read().await.chain_tip_header.block_num();
        let notes = self
            .store
            .get_unconsumed_network_notes(account_id, block_num.as_u32())
            .await
            .context("failed to load notes from store")?;

        // Write account and notes to DB.
        self.db
            .sync_account_from_store(account_id, account.clone(), notes.clone())
            .await
            .context("failed to sync account to DB")?;

        self.coordinator
            .spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
        Ok(())
    }

    /// Handles mempool events by writing to DB first, then notifying actors.
    #[tracing::instrument(name = "ntx.builder.handle_mempool_event", skip(self, event))]
    async fn handle_mempool_event(&mut self, event: MempoolEvent) -> Result<(), anyhow::Error> {
        match &event {
            MempoolEvent::TransactionAdded { account_delta, .. } => {
                // Write event effects to DB first.
                self.coordinator
                    .write_event(&event)
                    .await
                    .context("failed to write TransactionAdded to DB")?;

                // Handle account deltas in case an account is being created.
                if let Some(AccountUpdateDetails::Delta(delta)) = account_delta {
                    // Handle account deltas for network accounts only.
                    if let Some(network_account) = AccountOrigin::transaction(delta) {
                        // Spawn new actors if a transaction creates a new network account.
                        let is_creating_account = delta.is_full_state();
                        if is_creating_account {
                            self.coordinator.spawn_actor(network_account, &self.actor_context);
                        }
                    }
                }
                let inactive_targets = self.coordinator.send_targeted(&event);
                for account_id in inactive_targets {
                    self.coordinator
                        .spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
                }
                Ok(())
            },
            // Update chain state and notify affected actors.
            MempoolEvent::BlockCommitted { header, .. } => {
                // Write event effects to DB first.
                let result = self
                    .coordinator
                    .write_event(&event)
                    .await
                    .context("failed to write BlockCommitted to DB")?;

                self.update_chain_tip(header.as_ref().clone()).await;
                self.coordinator.notify_accounts(&result.accounts_to_notify);
                Ok(())
            },
            // Notify affected actors (reverted account actors will self-cancel when they
            // detect their account has been removed from the DB).
            MempoolEvent::TransactionsReverted(_) => {
                // Write event effects to DB first.
                let result = self
                    .coordinator
                    .write_event(&event)
                    .await
                    .context("failed to write TransactionsReverted to DB")?;

                self.coordinator.notify_accounts(&result.accounts_to_notify);
                Ok(())
            },
        }
    }

    /// Processes a request from an account actor.
    async fn handle_actor_request(&mut self, request: ActorRequest) -> Result<(), anyhow::Error> {
        match request {
            ActorRequest::NotesFailed { failed_notes, block_num, ack_tx } => {
                self.db
                    .notes_failed(failed_notes, block_num)
                    .await
                    .context("failed to mark notes as failed")?;
                let _ = ack_tx.send(());
            },
            ActorRequest::CacheNoteScript { script_root, script } => {
                self.db
                    .insert_note_script(script_root, &script)
                    .await
                    .context("failed to cache note script")?;
            },
        }
        Ok(())
    }

    /// Updates the chain tip and prunes old blocks from the MMR.
    async fn update_chain_tip(&mut self, tip: BlockHeader) {
        let mut chain_state = self.chain_state.write().await;

        // Update MMR which lags by one block.
        let mmr_tip = chain_state.chain_tip_header.clone();
        Arc::make_mut(&mut chain_state.chain_mmr).add_block(&mmr_tip, true);

        // Set the new tip.
        chain_state.chain_tip_header = tip;

        // Keep MMR pruned.
        let pruned_block_height = (chain_state
            .chain_mmr
            .chain_length()
            .as_usize()
            .saturating_sub(self.config.max_block_count)) as u32;
        Arc::make_mut(&mut chain_state.chain_mmr).prune_to(..pruned_block_height.into());
    }
}