miden_node_ntx_builder/builder/
mod.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::Context;
6use futures::TryStreamExt;
7use miden_node_proto::domain::account::NetworkAccountPrefix;
8use miden_node_utils::ErrorReport;
9use miden_remote_prover_client::remote_prover::tx_prover::RemoteTransactionProver;
10use tokio::sync::Barrier;
11use tokio::time;
12use url::Url;
13
14use crate::MAX_IN_PROGRESS_TXS;
15use crate::block_producer::BlockProducerClient;
16use crate::store::StoreClient;
17use crate::transaction::NtxError;
18
19// NETWORK TRANSACTION BUILDER
20// ================================================================================================
21
22/// Network transaction builder component.
23///
24/// The network transaction builder is in in charge of building transactions that consume notes
25/// against network accounts. These notes are identified and communicated by the block producer.
26/// The service maintains a list of unconsumed notes and periodically executes and proves
27/// transactions that consume them (reaching out to the store to retrieve state as necessary).
28pub struct NetworkTransactionBuilder {
29    /// Address of the store gRPC server.
30    store_url: Url,
31    /// Address of the block producer gRPC server.
32    block_producer_url: Url,
33    /// Address of the remote prover. If `None`, transactions will be proven locally, which is
34    /// undesirable due to the perofmrance impact.
35    tx_prover_url: Option<Url>,
36    /// Interval for checking pending notes and executing network transactions.
37    ticker_interval: Duration,
38    /// A checkpoint used to sync start-up process with the block-producer.
39    ///
40    /// This informs the block-producer when we have subscribed to mempool events and that it is
41    /// safe to begin block-production.
42    bp_checkpoint: Arc<Barrier>,
43}
44
45impl NetworkTransactionBuilder {
46    /// Creates a new instance of the network transaction builder.
47    pub fn new(
48        store_url: Url,
49        block_producer_url: Url,
50        tx_prover_url: Option<Url>,
51        ticker_interval: Duration,
52        bp_checkpoint: Arc<Barrier>,
53    ) -> Self {
54        Self {
55            store_url,
56            block_producer_url,
57            tx_prover_url,
58            ticker_interval,
59            bp_checkpoint,
60        }
61    }
62
63    pub async fn serve_new(self) -> anyhow::Result<()> {
64        let store = StoreClient::new(self.store_url);
65        let block_producer = BlockProducerClient::new(self.block_producer_url);
66
67        let mut state = crate::state::State::load(store.clone())
68            .await
69            .context("failed to load ntx state")?;
70
71        let mut mempool_events = block_producer
72            .subscribe_to_mempool_with_retry(state.chain_tip())
73            .await
74            .context("failed to subscribe to mempool events")?;
75
76        // Unlock the block-producer's block production. The block-producer is prevented from
77        // producing blocks until we have subscribed to mempool events.
78        //
79        // This is a temporary work-around until the ntb can resync on the fly.
80        self.bp_checkpoint.wait().await;
81
82        let prover = self.tx_prover_url.map(RemoteTransactionProver::new);
83
84        let mut interval = tokio::time::interval(self.ticker_interval);
85        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
86
87        // Tracks network transaction tasks until they are submitted to the mempool.
88        //
89        // We also map the task ID to the network account so we can mark it as failed if it doesn't
90        // get submitted.
91        let mut inflight = JoinSet::new();
92        let mut inflight_idx = HashMap::new();
93
94        let context = crate::transaction::NtxContext {
95            block_producer: block_producer.clone(),
96            prover,
97            store,
98        };
99
100        loop {
101            tokio::select! {
102                _next = interval.tick() => {
103                    if inflight.len() > MAX_IN_PROGRESS_TXS {
104                        tracing::info!("At maximum network tx capacity, skipping");
105                        continue;
106                    }
107
108                    let Some(candidate) = state.select_candidate(crate::MAX_NOTES_PER_TX) else {
109                        tracing::debug!("No candidate network transaction available");
110                        continue;
111                    };
112
113                    let network_account_prefix = NetworkAccountPrefix::try_from(candidate.account.id())
114                                                 .expect("all accounts managed by NTB are network accounts");
115                    let indexed_candidate = (network_account_prefix, candidate.chain_tip_header.block_num());
116                    let task_id = inflight.spawn({
117                        let context = context.clone();
118                        context.execute_transaction(candidate)
119                    }).id();
120
121                    // SAFETY: This is definitely a network account.
122                    inflight_idx.insert(task_id, indexed_candidate);
123                },
124                event = mempool_events.try_next() => {
125                    let event = event
126                                .context("mempool event stream ended")?
127                                .context("mempool event stream failed")?;
128                    state.mempool_update(event).await.context("failed to update state")?;
129                },
130                completed = inflight.join_next_with_id() => {
131                    // Grab the task ID and associated network account reference.
132                    let task_id = match &completed {
133                        Ok((task_id, _)) => *task_id,
134                        Err(join_handle) => join_handle.id(),
135                    };
136                    // SAFETY: both inflights should have the same set.
137                    let (candidate, block_num) = inflight_idx.remove(&task_id).unwrap();
138
139                    match completed {
140                        // Some notes failed.
141                        Ok((_, Ok(failed))) => {
142                            let notes = failed.into_iter().map(|note| note.note).collect::<Vec<_>>();
143                            state.notes_failed(candidate, notes.as_slice(), block_num);
144                        },
145                        // Transaction execution failed.
146                        Ok((_, Err(err))) => {
147                            tracing::warn!(err=err.as_report(), "network transaction failed");
148                            match err {
149                                NtxError::AllNotesFailed(failed) => {
150                                    let notes = failed.into_iter().map(|note| note.note).collect::<Vec<_>>();
151                                    state.notes_failed(candidate, notes.as_slice(), block_num);
152                                },
153                                NtxError::InputNotes(_)
154                                | NtxError::NoteFilter(_)
155                                | NtxError::Execution(_)
156                                | NtxError::Proving(_)
157                                | NtxError::Submission(_)
158                                | NtxError::Panic(_) => {},
159                            }
160                            state.candidate_failed(candidate);
161                        },
162                        // Unexpected error occurred.
163                        Err(err) => {
164                            tracing::warn!(err=err.as_report(), "network transaction panicked");
165                            state.candidate_failed(candidate);
166                        }
167                    }
168                }
169            }
170        }
171    }
172}
173
174/// A wrapper arounnd tokio's [`JoinSet`](tokio::task::JoinSet) which returns pending instead of
175/// [`None`] if its empty.
176///
177/// This makes it much more convenient to use in a `select!`.
178struct JoinSet<T>(tokio::task::JoinSet<T>);
179
180impl<T> JoinSet<T>
181where
182    T: 'static,
183{
184    fn new() -> Self {
185        Self(tokio::task::JoinSet::new())
186    }
187
188    fn spawn<F>(&mut self, task: F) -> tokio::task::AbortHandle
189    where
190        F: Future<Output = T>,
191        F: Send + 'static,
192        T: Send,
193    {
194        self.0.spawn(task)
195    }
196
197    async fn join_next_with_id(&mut self) -> Result<(tokio::task::Id, T), tokio::task::JoinError> {
198        if self.0.is_empty() {
199            std::future::pending().await
200        } else {
201            // Cannot be None as its not empty.
202            self.0.join_next_with_id().await.unwrap()
203        }
204    }
205
206    fn len(&self) -> usize {
207        self.0.len()
208    }
209}