miden_node_ntx_builder/builder/
mod.rs

1use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
2
3use anyhow::Context;
4use futures::TryStreamExt;
5use miden_node_proto::domain::account::NetworkAccountPrefix;
6use miden_node_utils::ErrorReport;
7use miden_remote_prover_client::remote_prover::tx_prover::RemoteTransactionProver;
8use tokio::{sync::Barrier, time};
9use url::Url;
10
11use crate::{MAX_IN_PROGRESS_TXS, block_producer::BlockProducerClient, store::StoreClient};
12
13// NETWORK TRANSACTION BUILDER
14// ================================================================================================
15
16/// Network transaction builder component.
17///
18/// The network transaction builder is in in charge of building transactions that consume notes
19/// against network accounts. These notes are identified and communicated by the block producer.
20/// The service maintains a list of unconsumed notes and periodically executes and proves
21/// transactions that consume them (reaching out to the store to retrieve state as necessary).
22pub struct NetworkTransactionBuilder {
23    /// Address of the store gRPC server.
24    pub store_url: Url,
25    /// Address of the block producer gRPC server.
26    pub block_producer_address: SocketAddr,
27    /// Address of the remote prover. If `None`, transactions will be proven locally, which is
28    /// undesirable due to the perofmrance impact.
29    pub tx_prover_url: Option<Url>,
30    /// Interval for checking pending notes and executing network transactions.
31    pub ticker_interval: Duration,
32    /// A checkpoint used to sync start-up process with the block-producer.
33    ///
34    /// This informs the block-producer when we have subscribed to mempool events and that it is
35    /// safe to begin block-production.
36    pub bp_checkpoint: Arc<Barrier>,
37}
38
39impl NetworkTransactionBuilder {
40    pub async fn serve_new(self) -> anyhow::Result<()> {
41        let store = StoreClient::new(&self.store_url);
42        let block_producer = BlockProducerClient::new(self.block_producer_address);
43
44        // Retry until the store is up and running. After this we expect all requests to pass.
45        let genesis_header = store
46            .genesis_header_with_retry()
47            .await
48            .context("failed to fetch genesis header")?;
49
50        let mut state = crate::state::State::load(store.clone())
51            .await
52            .context("failed to load ntx state")?;
53
54        let (chain_tip, _mmr) = store
55            .get_current_blockchain_data(None)
56            .await
57            .context("failed to fetch the chain tip data from the store")?
58            .context("chain tip data was None")?;
59
60        let mut mempool_events = block_producer
61            .subscribe_to_mempool_with_retry(chain_tip.block_num())
62            .await
63            .context("failed to subscribe to mempool events")?;
64
65        // Unlock the block-producer's block production. The block-producer is prevented from
66        // producing blocks until we have subscribed to mempool events.
67        //
68        // This is a temporary work-around until the ntb can resync on the fly.
69        self.bp_checkpoint.wait().await;
70
71        let prover = self.tx_prover_url.map(RemoteTransactionProver::new);
72
73        let mut interval = tokio::time::interval(self.ticker_interval);
74        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
75
76        // Tracks network transaction tasks until they are submitted to the mempool.
77        //
78        // We also map the task ID to the network account so we can mark it as failed if it doesn't
79        // get submitted.
80        let mut inflight = JoinSet::new();
81        let mut inflight_idx = HashMap::new();
82
83        let context = crate::transaction::NtxContext {
84            block_producer: block_producer.clone(),
85            genesis_header,
86            prover,
87        };
88
89        loop {
90            tokio::select! {
91                _next = interval.tick() => {
92                    if inflight.len() > MAX_IN_PROGRESS_TXS {
93                        tracing::info!("At maximum network tx capacity, skipping");
94                        continue;
95                    }
96
97                    let Some(candidate) = state.select_candidate(crate::MAX_NOTES_PER_TX) else {
98                        tracing::debug!("No candidate network transaction available");
99                        continue;
100                    };
101
102                    let prefix = NetworkAccountPrefix::try_from(candidate.account.id()).unwrap();
103                    let task_id = inflight.spawn({
104                        let context = context.clone();
105                        context.execute_transaction(candidate)
106                    }).id();
107
108                    // SAFETY: This is definitely a network account.
109                    inflight_idx.insert(task_id, prefix);
110                },
111                event = mempool_events.try_next() => {
112                    let event = event
113                        .context("mempool event stream ended")?
114                        .context("mempool event stream failed")?;
115                    state.mempool_update(event).await.context("failed to update state")?;
116                },
117                completed = inflight.join_next_with_id() => {
118                    // Grab the task ID and associated network account reference.
119                    let task_id = match &completed {
120                        Ok((task_id, _)) => *task_id,
121                        Err(join_handle) => join_handle.id(),
122                    };
123                    // SAFETY: both inflights should have the same set.
124                    let candidate = inflight_idx.remove(&task_id).unwrap();
125
126                    match completed {
127                        // Nothing to do. State will be updated by the eventual mempool event.
128                        Ok((_, Ok(_))) => {},
129                        // Inform state if the tx failed.
130                        Ok((_, Err(err))) => {
131                            tracing::warn!(err=err.as_report(), "network transaction failed");
132                            state.candidate_failed(candidate);
133                        },
134                        Err(err) => {
135                            tracing::warn!(err=err.as_report(), "network transaction panic'd");
136                            state.candidate_failed(candidate);
137                        }
138                    }
139                }
140            }
141        }
142    }
143}
144
145/// A wrapper arounnd tokio's [`JoinSet`](tokio::task::JoinSet) which returns pending instead of
146/// [`None`] if its empty.
147///
148/// This makes it much more convenient to use in a `select!`.
149struct JoinSet<T>(tokio::task::JoinSet<T>);
150
151impl<T> JoinSet<T>
152where
153    T: 'static,
154{
155    fn new() -> Self {
156        Self(tokio::task::JoinSet::new())
157    }
158
159    fn spawn<F>(&mut self, task: F) -> tokio::task::AbortHandle
160    where
161        F: Future<Output = T>,
162        F: Send + 'static,
163        T: Send,
164    {
165        self.0.spawn(task)
166    }
167
168    async fn join_next_with_id(&mut self) -> Result<(tokio::task::Id, T), tokio::task::JoinError> {
169        if self.0.is_empty() {
170            std::future::pending().await
171        } else {
172            // Cannot be None as its not empty.
173            self.0.join_next_with_id().await.unwrap()
174        }
175    }
176
177    fn len(&self) -> usize {
178        self.0.len()
179    }
180}