miden_node_ntx_builder/builder/
mod.rs

1use std::{collections::BTreeSet, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
2
3use anyhow::Context;
4use block_producer::BlockProducerClient;
5use data_store::NtxBuilderDataStore;
6use futures::TryFutureExt;
7use miden_node_proto::{
8    domain::{account::NetworkAccountError, note::NetworkNote},
9    generated::ntx_builder::api_server,
10};
11use miden_node_utils::tracing::OpenTelemetrySpanExt;
12use miden_objects::{
13    AccountError, TransactionInputError,
14    account::AccountId,
15    assembly::DefaultSourceManager,
16    block::BlockNumber,
17    note::{Note, NoteId, NoteTag},
18    transaction::{ExecutedTransaction, InputNote, InputNotes, TransactionArgs},
19};
20use miden_tx::{
21    NoteAccountExecution, NoteConsumptionChecker, TransactionExecutor, TransactionExecutorError,
22    TransactionProverError,
23};
24use prover::NtbTransactionProver;
25use server::{NtxBuilderApi, PendingNotes};
26use store::{StoreClient, StoreError};
27use thiserror::Error;
28use tokio::{
29    net::TcpListener,
30    runtime::Builder as RtBuilder,
31    sync::Mutex,
32    task::{JoinHandle, spawn_blocking},
33    time,
34};
35use tokio_stream::wrappers::TcpListenerStream;
36use tower_http::trace::TraceLayer;
37use tracing::{Instrument, Span, debug, error, info, info_span, instrument, warn};
38use url::Url;
39
40use crate::COMPONENT;
41
42mod block_producer;
43mod data_store;
44mod prover;
45mod server;
46mod store;
47
48type SharedPendingNotes = Arc<Mutex<PendingNotes>>;
49
50// NETWORK TRANSACTION REQUEST
51// ================================================================================================
52
53#[derive(Clone, Debug)]
54struct NetworkTransactionRequest {
55    pub account_id: AccountId,
56    pub block_ref: BlockNumber,
57    pub notes_to_execute: Vec<NetworkNote>,
58}
59
60impl NetworkTransactionRequest {
61    fn new(account_id: AccountId, block_ref: BlockNumber, notes: Vec<NetworkNote>) -> Self {
62        Self {
63            account_id,
64            block_ref,
65            notes_to_execute: notes,
66        }
67    }
68}
69
70// NETWORK TRANSACTION BUILDER
71// ================================================================================================
72
73/// Network transaction builder component.
74///
75/// The network transaction builder is in in charge of building transactions that consume notes
76/// against network accounts. These notes are identified and communicated by the block producer.
77/// The service maintains a list of unconsumed notes and periodically executes and proves
78/// transactions that consume them (reaching out to the store to retrieve state as necessary).
79pub struct NetworkTransactionBuilder {
80    /// The address for the network transaction builder gRPC server.
81    pub ntx_builder_address: SocketAddr,
82    /// Address of the store gRPC server.
83    pub store_url: Url,
84    /// Address of the block producer gRPC server.
85    pub block_producer_address: SocketAddr,
86    /// Address of the remote proving service. If `None`, transactions will be proven locally,
87    /// which is undesirable due to the perofmrance impact.
88    pub tx_prover_url: Option<Url>,
89    /// Interval for checking pending notes and executing network transactions.
90    pub ticker_interval: Duration,
91    /// Capacity of the in-memory account cache for the executor's data store.
92    pub account_cache_capacity: NonZeroUsize,
93}
94
95impl NetworkTransactionBuilder {
96    /// Serves the transaction builder service.
97    ///
98    /// If for any reason the service errors, it gets restarted.
99    pub async fn serve_resilient(&mut self) -> anyhow::Result<()> {
100        loop {
101            match self.serve_once().await {
102                Ok(()) => warn!(target: COMPONENT, "builder stopped without error, restarting"),
103                Err(e) => warn!(target: COMPONENT, error = %e, "builder crashed, restarting"),
104            }
105
106            // sleep before retrying to not spin the server
107            time::sleep(Duration::from_secs(1)).await;
108        }
109    }
110
111    pub async fn serve_once(&self) -> anyhow::Result<()> {
112        let store = StoreClient::new(&self.store_url);
113        let unconsumed = store.get_unconsumed_network_notes().await?;
114        let notes_queue = Arc::new(Mutex::new(PendingNotes::new(unconsumed)));
115
116        let listener = TcpListener::bind(self.ntx_builder_address).await?;
117        let server = tonic::transport::Server::builder()
118            .accept_http1(true)
119            .layer(TraceLayer::new_for_grpc())
120            .add_service(api_server::ApiServer::new(NtxBuilderApi::new(notes_queue.clone())))
121            .serve_with_incoming(TcpListenerStream::new(listener));
122        tokio::pin!(server);
123
124        let mut ticker = self.spawn_ticker(notes_queue.clone());
125
126        loop {
127            tokio::select! {
128                // gRPC server ended, shut down ticker and bubble error up
129                result = &mut server => {
130                    ticker.abort();
131                    return result.context("gRPC server stopped");
132                }
133                // ticker ended or panicked, respawn it; RPC server keeps running
134                outcome = &mut ticker => {
135                    match outcome {
136                        Ok(Ok(())) => warn!(target: COMPONENT, "ticker stopped; respawning"),
137                        Ok(Err(e)) => error!(target: COMPONENT, error=%e, "ticker errored; respawning"),
138                        Err(join_err) => error!(target: COMPONENT, error=%join_err, "ticker panicked; respawning"),
139                    }
140                    ticker = self.spawn_ticker(notes_queue.clone());
141                }
142            }
143        }
144    }
145
146    /// Spawns the ticker task and returns a handle to it.
147    ///
148    /// The ticker is in charge of periodically checking the network notes set and executing the
149    /// next set of notes.
150    fn spawn_ticker(&self, api_state: SharedPendingNotes) -> JoinHandle<anyhow::Result<()>> {
151        let store_url = self.store_url.clone();
152        let block_addr = self.block_producer_address;
153        let prover_addr = self.tx_prover_url.clone();
154        let ticker_interval = self.ticker_interval;
155
156        spawn_blocking(move || {
157            let rt = RtBuilder::new_current_thread()
158                .enable_all()
159                .build()
160                .context("failed to build runtime")?;
161
162            rt.block_on(async move {
163                info!(target: COMPONENT, "Spawned NTB ticker (ticks every {} ms)", &ticker_interval.as_millis());
164                let store = StoreClient::new(&store_url);
165                let data_store = Arc::new(NtxBuilderDataStore::new(store).await?);
166                let tx_executor = TransactionExecutor::new(data_store.clone(), None);
167                let tx_prover = NtbTransactionProver::from(prover_addr);
168                let block_prod = BlockProducerClient::new(block_addr);
169
170                let mut interval = time::interval(ticker_interval);
171
172                loop {
173                    interval.tick().await;
174
175                    let result = Self::build_network_tx(
176                        &api_state,
177                        &tx_executor,
178                        &data_store,
179                        &tx_prover,
180                        &block_prod,
181                    )
182                    .await;
183
184                    if let Err(e) = result {
185                        error!(target: COMPONENT,err=%e, "Error preparing transaction");
186                    }
187                }
188            })
189        })
190    }
191
192    /// Performs all steps to submit a proven transaction to the block producer:
193    ///
194    /// - (preflight) Gets the next tag and set of notes to consume.
195    ///   - With this, MMR peaks and the latest header is retrieved
196    ///   - The executor account is retrieved from the cache or store.
197    ///   - If the executor account is not found, the notes are **discarded** and note requeued.
198    /// - Executes, proves and submits a network transaction.
199    /// - After executing, updates the account cache with the new account state and any notes that
200    ///   were note used are requeued
201    ///
202    /// A failure on the second stage will result in the transaction being rolled back.
203    ///
204    /// ## Telemetry
205    ///
206    /// - Creates a new root span which means each transaction gets its own complete trace.
207    /// - Adds an `ntx.tag` attribute to the whole span to describe the account that will execute
208    ///   the ntx.
209    /// - Each stage has its own child span and are free to add further field data.
210    /// - A failed step on the execution stage will emit an error event, and both its own span and
211    ///   the root span will be marked as errors.
212    ///
213    /// # Errors
214    ///
215    /// - Returns an error only when the preflight stage errors. On the execution stage, errors are
216    ///   logged and the transaction gets rolled back.
217    async fn build_network_tx(
218        api_state: &SharedPendingNotes,
219        tx_executor: &TransactionExecutor,
220        data_store: &Arc<NtxBuilderDataStore>,
221        tx_prover: &NtbTransactionProver,
222        block_prod: &BlockProducerClient,
223    ) -> Result<(), NtxBuilderError> {
224        // Preflight: Look for next account and blockchain data, and select notes
225        let Some(tx_request) = Self::select_next_tx(api_state, data_store).await? else {
226            debug!(target: COMPONENT, "No notes for existing network accounts found, returning.");
227            return Ok(());
228        };
229
230        // Execution: Filter notes, execute, prove and submit tx
231        let executed_tx = Self::filter_consumable_notes(data_store,tx_executor, &tx_request)
232            .and_then(|filtered_tx_req| Self::execute_transaction(tx_executor, filtered_tx_req))
233            .and_then(|executed_tx| Self::prove_and_submit_transaction(tx_prover, block_prod, executed_tx))
234            .inspect_ok(|tx| {
235                info!(target: COMPONENT, tx_id = %tx.id(), "Proved and submitted network transaction");
236            })
237            .inspect_err(|err| {
238                warn!(target: COMPONENT, error = %err, "Error in transaction processing");
239                Span::current().set_error(err);
240            })
241            .instrument(Span::current())
242            .await;
243
244        // If execution succeeded, requeue notes we did not use and update account cache
245        if let Ok(tx) = executed_tx {
246            let executed_ids: BTreeSet<NoteId> =
247                tx.input_notes().iter().map(InputNote::id).collect();
248            let failed_notes = tx_request
249                .notes_to_execute
250                .iter()
251                .filter(|n| !executed_ids.contains(&n.id()))
252                .cloned();
253
254            api_state.lock().await.queue_unconsumed_notes(failed_notes);
255
256            data_store
257                .update_account(&tx)
258                .map_err(NtxBuilderError::AccountCacheUpdateFailed)?;
259        } else {
260            // Otherwise, roll back
261            Self::rollback_tx(tx_request, api_state, data_store).await;
262        }
263
264        Ok(())
265    }
266
267    /// Selects the next tag and set of notes to execute.
268    /// If a tag is in queue, we attempt to retrieve the account and update the datastore's partial
269    /// MMR.
270    ///
271    /// If this function errors, the notes are effectively discarded because [`Self::rollback_tx()`]
272    /// is not called.
273    async fn select_next_tx(
274        api_state: &SharedPendingNotes,
275        data_store: &Arc<NtxBuilderDataStore>,
276    ) -> Result<Option<NetworkTransactionRequest>, NtxBuilderError> {
277        let Some((tag, notes)) = api_state.lock().await.take_next_notes_by_tag() else {
278            return Ok(None);
279        };
280
281        let span = info_span!("ntx_builder.select_next_batch");
282        span.set_attribute("ntx.tag", tag.inner());
283
284        let block_num = Self::prepare_blockchain_data(data_store).await?;
285        let account_id = Self::get_account_for_ntx(data_store, tag).await?;
286
287        match account_id {
288            Some(id) => Ok(Some(NetworkTransactionRequest::new(id, block_num, notes))),
289            // No network account found for note tag, discard (notes are not requeued)
290            None => Ok(None),
291        }
292    }
293
294    /// Updates the partial blockchain and latest header within the datastore.
295    #[instrument(target = COMPONENT, name = "ntx_builder.prepare_blockchain_data", skip_all, err)]
296    async fn prepare_blockchain_data(
297        data_store: &Arc<NtxBuilderDataStore>,
298    ) -> Result<BlockNumber, StoreError> {
299        data_store.update_blockchain_data().await
300    }
301
302    /// Gets the account from the cache or from the store if it's not found in the cache.
303    #[instrument(target = COMPONENT, name = "ntx_builder.get_account_for_batch", skip_all, err)]
304    async fn get_account_for_ntx(
305        data_store: &Arc<NtxBuilderDataStore>,
306        tag: NoteTag,
307    ) -> Result<Option<AccountId>, NtxBuilderError> {
308        let account = data_store.get_cached_acc_or_fetch_by_tag(tag).await?;
309
310        let Some(account) = account else {
311            warn!(target: COMPONENT, "Network account details for tag {tag} not found in the store");
312            return Ok(None);
313        };
314
315        Ok(Some(account.id()))
316    }
317
318    /// Filters the [`NetworkTransactionRequest`]'s notes by making one consumption check against
319    /// the executing account.
320    #[instrument(target = COMPONENT, name = "ntx_builder.filter_consumable_notes", skip_all, err)]
321    async fn filter_consumable_notes(
322        data_store: &Arc<NtxBuilderDataStore>,
323        tx_executor: &TransactionExecutor,
324        tx_request: &NetworkTransactionRequest,
325    ) -> Result<NetworkTransactionRequest, NtxBuilderError> {
326        let input_notes = InputNotes::new(
327            tx_request
328                .notes_to_execute
329                .iter()
330                .cloned()
331                .map(Note::from)
332                .map(InputNote::unauthenticated)
333                .collect(),
334        )?;
335
336        for note in input_notes.iter() {
337            data_store.insert_note_script_mast(note.note().script());
338        }
339
340        let checker = NoteConsumptionChecker::new(tx_executor);
341        match checker
342            .check_notes_consumability(
343                tx_request.account_id,
344                tx_request.block_ref,
345                input_notes.clone(),
346                TransactionArgs::default(),
347                Arc::new(DefaultSourceManager::default()),
348            )
349            .await
350        {
351            Ok(NoteAccountExecution::Success) => Ok(tx_request.clone()),
352            Ok(NoteAccountExecution::Failure { successful_notes, error, failed_note_id }) => {
353                let successful_network_notes: Vec<NetworkNote> = input_notes
354                    .iter()
355                    .filter(|n| successful_notes.contains(&n.id()))
356                    .map(InputNote::note)
357                    .cloned()
358                    .map(|n| NetworkNote::try_from(n).expect("conversion should work"))
359                    .collect();
360
361                if let Some(ref err) = error {
362                    Span::current()
363                        .set_attribute("ntx.consumption_check_error", err.to_string().as_str());
364                } else {
365                    Span::current().set_attribute("ntx.consumption_check_error", "none");
366                }
367                Span::current()
368                    .set_attribute("ntx.failed_note_id", failed_note_id.to_hex().as_str());
369
370                if successful_network_notes.is_empty() {
371                    return Err(NtxBuilderError::NoteSetIsEmpty(tx_request.account_id));
372                }
373
374                Ok(NetworkTransactionRequest::new(
375                    tx_request.account_id,
376                    tx_request.block_ref,
377                    successful_network_notes,
378                ))
379            },
380            Err(err) => Err(NtxBuilderError::NoteConsumptionCheckFailed(err)),
381        }
382    }
383
384    /// Executes the transaction with the account described by the request.
385    #[instrument(target = COMPONENT, name = "ntx_builder.execute_transaction", skip_all, err)]
386    async fn execute_transaction(
387        tx_executor: &TransactionExecutor,
388        tx_request: NetworkTransactionRequest,
389    ) -> Result<ExecutedTransaction, NtxBuilderError> {
390        let input_notes = InputNotes::new(
391            tx_request
392                .notes_to_execute
393                .iter()
394                .cloned()
395                .map(Note::from)
396                .map(InputNote::unauthenticated)
397                .collect(),
398        )?;
399
400        tx_executor
401            .execute_transaction(
402                tx_request.account_id,
403                tx_request.block_ref,
404                input_notes,
405                TransactionArgs::default(),
406                Arc::new(DefaultSourceManager::default()),
407            )
408            .await
409            .map_err(NtxBuilderError::ExecutionError)
410    }
411
412    /// Proves the transaction and submits it to the mempool.
413    #[instrument(target = COMPONENT, name = "ntx_builder.prove_and_submit_transaction", skip_all, err)]
414    async fn prove_and_submit_transaction(
415        tx_prover: &NtbTransactionProver,
416        block_prod: &BlockProducerClient,
417        executed_tx: ExecutedTransaction,
418    ) -> Result<ExecutedTransaction, NtxBuilderError> {
419        tx_prover.prove_and_submit(block_prod, &executed_tx).await?;
420
421        Ok(executed_tx)
422    }
423
424    /// Rolls back the transaction. This should be executed if the execution stage of the pipeline
425    /// failed. Specifically, this involves requeuing notes and evicting the account from the
426    /// cache.
427    #[instrument(target = COMPONENT, name = "ntx_builder.rollback_tx", skip_all)]
428    async fn rollback_tx(
429        tx_request: NetworkTransactionRequest,
430        api_state: &SharedPendingNotes,
431        data_store: &Arc<NtxBuilderDataStore>,
432    ) {
433        // Roll back any state changes and re-queue notes if needed
434        data_store.evict_account(tx_request.account_id);
435
436        api_state.lock().await.queue_unconsumed_notes(tx_request.notes_to_execute);
437    }
438}
439
440// BUILDER ERRORS
441// =================================================================================================
442
443#[derive(Debug, Error)]
444pub enum NtxBuilderError {
445    #[error("account cache update error")]
446    AccountCacheUpdateFailed(#[from] AccountError),
447    #[error("store error")]
448    Store(#[from] StoreError),
449    #[error("transaction inputs error")]
450    TransactionInputError(#[from] TransactionInputError),
451    #[error("transaction execution error")]
452    ExecutionError(#[source] TransactionExecutorError),
453    #[error("error while checking for note consumption compatibility")]
454    NoteConsumptionCheckFailed(#[source] TransactionExecutorError),
455    #[error("after performing a consumption check for account, the note list became empty")]
456    NoteSetIsEmpty(AccountId),
457    #[error("block producer client error")]
458    BlockProducer(#[from] tonic::Status),
459    #[error("network account error")]
460    NetworkAccount(#[from] NetworkAccountError),
461    #[error("error while proving transaction")]
462    ProverError(#[from] TransactionProverError),
463    #[error("error while proving transaction")]
464    ProofSubmissionFailed(#[source] tonic::Status),
465}