miden_node_ntx_builder/builder/
mod.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::Context;
6use futures::TryStreamExt;
7use miden_node_proto::domain::account::NetworkAccountPrefix;
8use miden_node_utils::ErrorReport;
9use miden_remote_prover_client::remote_prover::tx_prover::RemoteTransactionProver;
10use tokio::sync::Barrier;
11use tokio::time;
12use url::Url;
13
14use crate::MAX_IN_PROGRESS_TXS;
15use crate::block_producer::BlockProducerClient;
16use crate::store::StoreClient;
17
18// NETWORK TRANSACTION BUILDER
19// ================================================================================================
20
21/// Network transaction builder component.
22///
23/// The network transaction builder is in in charge of building transactions that consume notes
24/// against network accounts. These notes are identified and communicated by the block producer.
25/// The service maintains a list of unconsumed notes and periodically executes and proves
26/// transactions that consume them (reaching out to the store to retrieve state as necessary).
27pub struct NetworkTransactionBuilder {
28    /// Address of the store gRPC server.
29    store_url: Url,
30    /// Address of the block producer gRPC server.
31    block_producer_url: Url,
32    /// Address of the remote prover. If `None`, transactions will be proven locally, which is
33    /// undesirable due to the perofmrance impact.
34    tx_prover_url: Option<Url>,
35    /// Interval for checking pending notes and executing network transactions.
36    ticker_interval: Duration,
37    /// A checkpoint used to sync start-up process with the block-producer.
38    ///
39    /// This informs the block-producer when we have subscribed to mempool events and that it is
40    /// safe to begin block-production.
41    bp_checkpoint: Arc<Barrier>,
42}
43
44impl NetworkTransactionBuilder {
45    /// Creates a new instance of the network transaction builder.
46    pub fn new(
47        store_url: Url,
48        block_producer_url: Url,
49        tx_prover_url: Option<Url>,
50        ticker_interval: Duration,
51        bp_checkpoint: Arc<Barrier>,
52    ) -> Self {
53        Self {
54            store_url,
55            block_producer_url,
56            tx_prover_url,
57            ticker_interval,
58            bp_checkpoint,
59        }
60    }
61
62    pub async fn serve_new(self) -> anyhow::Result<()> {
63        let store = StoreClient::new(self.store_url);
64        let block_producer = BlockProducerClient::new(self.block_producer_url);
65
66        let mut state = crate::state::State::load(store.clone())
67            .await
68            .context("failed to load ntx state")?;
69
70        let mut mempool_events = block_producer
71            .subscribe_to_mempool_with_retry(state.chain_tip())
72            .await
73            .context("failed to subscribe to mempool events")?;
74
75        // Unlock the block-producer's block production. The block-producer is prevented from
76        // producing blocks until we have subscribed to mempool events.
77        //
78        // This is a temporary work-around until the ntb can resync on the fly.
79        self.bp_checkpoint.wait().await;
80
81        let prover = self.tx_prover_url.map(RemoteTransactionProver::new);
82
83        let mut interval = tokio::time::interval(self.ticker_interval);
84        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
85
86        // Tracks network transaction tasks until they are submitted to the mempool.
87        //
88        // We also map the task ID to the network account so we can mark it as failed if it doesn't
89        // get submitted.
90        let mut inflight = JoinSet::new();
91        let mut inflight_idx = HashMap::new();
92
93        let context = crate::transaction::NtxContext {
94            block_producer: block_producer.clone(),
95            prover,
96            store,
97        };
98
99        loop {
100            tokio::select! {
101                _next = interval.tick() => {
102                    if inflight.len() > MAX_IN_PROGRESS_TXS {
103                        tracing::info!("At maximum network tx capacity, skipping");
104                        continue;
105                    }
106
107                    let Some(candidate) = state.select_candidate(crate::MAX_NOTES_PER_TX) else {
108                        tracing::debug!("No candidate network transaction available");
109                        continue;
110                    };
111
112                    let network_account_prefix = NetworkAccountPrefix::try_from(candidate.account.id())
113                                                 .expect("all accounts managed by NTB are network accounts");
114                    let indexed_candidate = (network_account_prefix, candidate.chain_tip_header.block_num());
115                    let task_id = inflight.spawn({
116                        let context = context.clone();
117                        context.execute_transaction(candidate)
118                    }).id();
119
120                    // SAFETY: This is definitely a network account.
121                    inflight_idx.insert(task_id, indexed_candidate);
122                },
123                event = mempool_events.try_next() => {
124                    let event = event
125                                .context("mempool event stream ended")?
126                                .context("mempool event stream failed")?;
127                    state.mempool_update(event).await.context("failed to update state")?;
128                },
129                completed = inflight.join_next_with_id() => {
130                    // Grab the task ID and associated network account reference.
131                    let task_id = match &completed {
132                        Ok((task_id, _)) => *task_id,
133                        Err(join_handle) => join_handle.id(),
134                    };
135                    // SAFETY: both inflights should have the same set.
136                    let (candidate, block_num) = inflight_idx.remove(&task_id).unwrap();
137
138                    match completed {
139                        // Some notes failed.
140                        Ok((_, Ok(failed))) => {
141                            let notes = failed.into_iter().map(|note| note.note).collect::<Vec<_>>();
142                            state.notes_failed(candidate, notes.as_slice(), block_num);
143                        },
144                        // Transaction execution failed.
145                        Ok((_, Err((notes, err)))) => {
146                            tracing::warn!(err=err.as_report(), "network transaction failed");
147                            // Always mark notes as failed. They can get retried eventually.
148                            state.notes_failed(candidate, notes.as_slice(), block_num);
149
150                            state.candidate_failed(candidate);
151                        },
152                        // Unexpected error occurred.
153                        Err(err) => {
154                            tracing::warn!(err=err.as_report(), "network transaction panicked");
155                            state.candidate_failed(candidate);
156                        }
157                    }
158                }
159            }
160        }
161    }
162}
163
164/// A wrapper arounnd tokio's [`JoinSet`](tokio::task::JoinSet) which returns pending instead of
165/// [`None`] if its empty.
166///
167/// This makes it much more convenient to use in a `select!`.
168struct JoinSet<T>(tokio::task::JoinSet<T>);
169
170impl<T> JoinSet<T>
171where
172    T: 'static,
173{
174    fn new() -> Self {
175        Self(tokio::task::JoinSet::new())
176    }
177
178    fn spawn<F>(&mut self, task: F) -> tokio::task::AbortHandle
179    where
180        F: Future<Output = T>,
181        F: Send + 'static,
182        T: Send,
183    {
184        self.0.spawn(task)
185    }
186
187    async fn join_next_with_id(&mut self) -> Result<(tokio::task::Id, T), tokio::task::JoinError> {
188        if self.0.is_empty() {
189            std::future::pending().await
190        } else {
191            // Cannot be None as its not empty.
192            self.0.join_next_with_id().await.unwrap()
193        }
194    }
195
196    fn len(&self) -> usize {
197        self.0.len()
198    }
199}