1use std::{collections::BTreeSet, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
2
3use anyhow::Context;
4use block_producer::BlockProducerClient;
5use data_store::NtxBuilderDataStore;
6use futures::TryFutureExt;
7use miden_node_proto::{
8 domain::{account::NetworkAccountError, note::NetworkNote},
9 generated::ntx_builder::api_server,
10};
11use miden_node_utils::tracing::OpenTelemetrySpanExt;
12use miden_objects::{
13 AccountError, TransactionInputError,
14 account::AccountId,
15 assembly::DefaultSourceManager,
16 block::BlockNumber,
17 note::{Note, NoteId, NoteTag},
18 transaction::{ExecutedTransaction, InputNote, InputNotes, TransactionArgs},
19};
20use miden_tx::{
21 NoteAccountExecution, NoteConsumptionChecker, TransactionExecutor, TransactionExecutorError,
22 TransactionProverError,
23};
24use prover::NtbTransactionProver;
25use server::{NtxBuilderApi, PendingNotes};
26use store::{StoreClient, StoreError};
27use thiserror::Error;
28use tokio::{
29 net::TcpListener,
30 runtime::Builder as RtBuilder,
31 sync::Mutex,
32 task::{JoinHandle, spawn_blocking},
33 time,
34};
35use tokio_stream::wrappers::TcpListenerStream;
36use tower_http::trace::TraceLayer;
37use tracing::{Instrument, Span, debug, error, info, info_span, instrument, warn};
38use url::Url;
39
40use crate::COMPONENT;
41
42mod block_producer;
43mod data_store;
44mod prover;
45mod server;
46mod store;
47
48type SharedPendingNotes = Arc<Mutex<PendingNotes>>;
49
50#[derive(Clone, Debug)]
54struct NetworkTransactionRequest {
55 pub account_id: AccountId,
56 pub block_ref: BlockNumber,
57 pub notes_to_execute: Vec<NetworkNote>,
58}
59
60impl NetworkTransactionRequest {
61 fn new(account_id: AccountId, block_ref: BlockNumber, notes: Vec<NetworkNote>) -> Self {
62 Self {
63 account_id,
64 block_ref,
65 notes_to_execute: notes,
66 }
67 }
68}
69
70pub struct NetworkTransactionBuilder {
80 pub ntx_builder_address: SocketAddr,
82 pub store_url: Url,
84 pub block_producer_address: SocketAddr,
86 pub tx_prover_url: Option<Url>,
89 pub ticker_interval: Duration,
91 pub account_cache_capacity: NonZeroUsize,
93}
94
95impl NetworkTransactionBuilder {
96 pub async fn serve_resilient(&mut self) -> anyhow::Result<()> {
100 loop {
101 match self.serve_once().await {
102 Ok(()) => warn!(target: COMPONENT, "builder stopped without error, restarting"),
103 Err(e) => warn!(target: COMPONENT, error = %e, "builder crashed, restarting"),
104 }
105
106 time::sleep(Duration::from_secs(1)).await;
108 }
109 }
110
111 pub async fn serve_once(&self) -> anyhow::Result<()> {
112 let store = StoreClient::new(&self.store_url);
113 let unconsumed = store.get_unconsumed_network_notes().await?;
114 let notes_queue = Arc::new(Mutex::new(PendingNotes::new(unconsumed)));
115
116 let listener = TcpListener::bind(self.ntx_builder_address).await?;
117 let server = tonic::transport::Server::builder()
118 .accept_http1(true)
119 .layer(TraceLayer::new_for_grpc())
120 .add_service(api_server::ApiServer::new(NtxBuilderApi::new(notes_queue.clone())))
121 .serve_with_incoming(TcpListenerStream::new(listener));
122 tokio::pin!(server);
123
124 let mut ticker = self.spawn_ticker(notes_queue.clone());
125
126 loop {
127 tokio::select! {
128 result = &mut server => {
130 ticker.abort();
131 return result.context("gRPC server stopped");
132 }
133 outcome = &mut ticker => {
135 match outcome {
136 Ok(Ok(())) => warn!(target: COMPONENT, "ticker stopped; respawning"),
137 Ok(Err(e)) => error!(target: COMPONENT, error=%e, "ticker errored; respawning"),
138 Err(join_err) => error!(target: COMPONENT, error=%join_err, "ticker panicked; respawning"),
139 }
140 ticker = self.spawn_ticker(notes_queue.clone());
141 }
142 }
143 }
144 }
145
146 fn spawn_ticker(&self, api_state: SharedPendingNotes) -> JoinHandle<anyhow::Result<()>> {
151 let store_url = self.store_url.clone();
152 let block_addr = self.block_producer_address;
153 let prover_addr = self.tx_prover_url.clone();
154 let ticker_interval = self.ticker_interval;
155
156 spawn_blocking(move || {
157 let rt = RtBuilder::new_current_thread()
158 .enable_all()
159 .build()
160 .context("failed to build runtime")?;
161
162 rt.block_on(async move {
163 info!(target: COMPONENT, "Spawned NTB ticker (ticks every {} ms)", &ticker_interval.as_millis());
164 let store = StoreClient::new(&store_url);
165 let data_store = Arc::new(NtxBuilderDataStore::new(store).await?);
166 let tx_executor = TransactionExecutor::new(data_store.clone(), None);
167 let tx_prover = NtbTransactionProver::from(prover_addr);
168 let block_prod = BlockProducerClient::new(block_addr);
169
170 let mut interval = time::interval(ticker_interval);
171
172 loop {
173 interval.tick().await;
174
175 let result = Self::build_network_tx(
176 &api_state,
177 &tx_executor,
178 &data_store,
179 &tx_prover,
180 &block_prod,
181 )
182 .await;
183
184 if let Err(e) = result {
185 error!(target: COMPONENT,err=%e, "Error preparing transaction");
186 }
187 }
188 })
189 })
190 }
191
192 async fn build_network_tx(
218 api_state: &SharedPendingNotes,
219 tx_executor: &TransactionExecutor,
220 data_store: &Arc<NtxBuilderDataStore>,
221 tx_prover: &NtbTransactionProver,
222 block_prod: &BlockProducerClient,
223 ) -> Result<(), NtxBuilderError> {
224 let Some(tx_request) = Self::select_next_tx(api_state, data_store).await? else {
226 debug!(target: COMPONENT, "No notes for existing network accounts found, returning.");
227 return Ok(());
228 };
229
230 let executed_tx = Self::filter_consumable_notes(data_store,tx_executor, &tx_request)
232 .and_then(|filtered_tx_req| Self::execute_transaction(tx_executor, filtered_tx_req))
233 .and_then(|executed_tx| Self::prove_and_submit_transaction(tx_prover, block_prod, executed_tx))
234 .inspect_ok(|tx| {
235 info!(target: COMPONENT, tx_id = %tx.id(), "Proved and submitted network transaction");
236 })
237 .inspect_err(|err| {
238 warn!(target: COMPONENT, error = %err, "Error in transaction processing");
239 Span::current().set_error(err);
240 })
241 .instrument(Span::current())
242 .await;
243
244 if let Ok(tx) = executed_tx {
246 let executed_ids: BTreeSet<NoteId> =
247 tx.input_notes().iter().map(InputNote::id).collect();
248 let failed_notes = tx_request
249 .notes_to_execute
250 .iter()
251 .filter(|n| !executed_ids.contains(&n.id()))
252 .cloned();
253
254 api_state.lock().await.queue_unconsumed_notes(failed_notes);
255
256 data_store
257 .update_account(&tx)
258 .map_err(NtxBuilderError::AccountCacheUpdateFailed)?;
259 } else {
260 Self::rollback_tx(tx_request, api_state, data_store).await;
262 }
263
264 Ok(())
265 }
266
267 async fn select_next_tx(
274 api_state: &SharedPendingNotes,
275 data_store: &Arc<NtxBuilderDataStore>,
276 ) -> Result<Option<NetworkTransactionRequest>, NtxBuilderError> {
277 let Some((tag, notes)) = api_state.lock().await.take_next_notes_by_tag() else {
278 return Ok(None);
279 };
280
281 let span = info_span!("ntx_builder.select_next_batch");
282 span.set_attribute("ntx.tag", tag.inner());
283
284 let block_num = Self::prepare_blockchain_data(data_store).await?;
285 let account_id = Self::get_account_for_ntx(data_store, tag).await?;
286
287 match account_id {
288 Some(id) => Ok(Some(NetworkTransactionRequest::new(id, block_num, notes))),
289 None => Ok(None),
291 }
292 }
293
294 #[instrument(target = COMPONENT, name = "ntx_builder.prepare_blockchain_data", skip_all, err)]
296 async fn prepare_blockchain_data(
297 data_store: &Arc<NtxBuilderDataStore>,
298 ) -> Result<BlockNumber, StoreError> {
299 data_store.update_blockchain_data().await
300 }
301
302 #[instrument(target = COMPONENT, name = "ntx_builder.get_account_for_batch", skip_all, err)]
304 async fn get_account_for_ntx(
305 data_store: &Arc<NtxBuilderDataStore>,
306 tag: NoteTag,
307 ) -> Result<Option<AccountId>, NtxBuilderError> {
308 let account = data_store.get_cached_acc_or_fetch_by_tag(tag).await?;
309
310 let Some(account) = account else {
311 warn!(target: COMPONENT, "Network account details for tag {tag} not found in the store");
312 return Ok(None);
313 };
314
315 Ok(Some(account.id()))
316 }
317
318 #[instrument(target = COMPONENT, name = "ntx_builder.filter_consumable_notes", skip_all, err)]
321 async fn filter_consumable_notes(
322 data_store: &Arc<NtxBuilderDataStore>,
323 tx_executor: &TransactionExecutor,
324 tx_request: &NetworkTransactionRequest,
325 ) -> Result<NetworkTransactionRequest, NtxBuilderError> {
326 let input_notes = InputNotes::new(
327 tx_request
328 .notes_to_execute
329 .iter()
330 .cloned()
331 .map(Note::from)
332 .map(InputNote::unauthenticated)
333 .collect(),
334 )?;
335
336 for note in input_notes.iter() {
337 data_store.insert_note_script_mast(note.note().script());
338 }
339
340 let checker = NoteConsumptionChecker::new(tx_executor);
341 match checker
342 .check_notes_consumability(
343 tx_request.account_id,
344 tx_request.block_ref,
345 input_notes.clone(),
346 TransactionArgs::default(),
347 Arc::new(DefaultSourceManager::default()),
348 )
349 .await
350 {
351 Ok(NoteAccountExecution::Success) => Ok(tx_request.clone()),
352 Ok(NoteAccountExecution::Failure { successful_notes, error, failed_note_id }) => {
353 let successful_network_notes: Vec<NetworkNote> = input_notes
354 .iter()
355 .filter(|n| successful_notes.contains(&n.id()))
356 .map(InputNote::note)
357 .cloned()
358 .map(|n| NetworkNote::try_from(n).expect("conversion should work"))
359 .collect();
360
361 if let Some(ref err) = error {
362 Span::current()
363 .set_attribute("ntx.consumption_check_error", err.to_string().as_str());
364 } else {
365 Span::current().set_attribute("ntx.consumption_check_error", "none");
366 }
367 Span::current()
368 .set_attribute("ntx.failed_note_id", failed_note_id.to_hex().as_str());
369
370 if successful_network_notes.is_empty() {
371 return Err(NtxBuilderError::NoteSetIsEmpty(tx_request.account_id));
372 }
373
374 Ok(NetworkTransactionRequest::new(
375 tx_request.account_id,
376 tx_request.block_ref,
377 successful_network_notes,
378 ))
379 },
380 Err(err) => Err(NtxBuilderError::NoteConsumptionCheckFailed(err)),
381 }
382 }
383
384 #[instrument(target = COMPONENT, name = "ntx_builder.execute_transaction", skip_all, err)]
386 async fn execute_transaction(
387 tx_executor: &TransactionExecutor,
388 tx_request: NetworkTransactionRequest,
389 ) -> Result<ExecutedTransaction, NtxBuilderError> {
390 let input_notes = InputNotes::new(
391 tx_request
392 .notes_to_execute
393 .iter()
394 .cloned()
395 .map(Note::from)
396 .map(InputNote::unauthenticated)
397 .collect(),
398 )?;
399
400 tx_executor
401 .execute_transaction(
402 tx_request.account_id,
403 tx_request.block_ref,
404 input_notes,
405 TransactionArgs::default(),
406 Arc::new(DefaultSourceManager::default()),
407 )
408 .await
409 .map_err(NtxBuilderError::ExecutionError)
410 }
411
412 #[instrument(target = COMPONENT, name = "ntx_builder.prove_and_submit_transaction", skip_all, err)]
414 async fn prove_and_submit_transaction(
415 tx_prover: &NtbTransactionProver,
416 block_prod: &BlockProducerClient,
417 executed_tx: ExecutedTransaction,
418 ) -> Result<ExecutedTransaction, NtxBuilderError> {
419 tx_prover.prove_and_submit(block_prod, &executed_tx).await?;
420
421 Ok(executed_tx)
422 }
423
424 #[instrument(target = COMPONENT, name = "ntx_builder.rollback_tx", skip_all)]
428 async fn rollback_tx(
429 tx_request: NetworkTransactionRequest,
430 api_state: &SharedPendingNotes,
431 data_store: &Arc<NtxBuilderDataStore>,
432 ) {
433 data_store.evict_account(tx_request.account_id);
435
436 api_state.lock().await.queue_unconsumed_notes(tx_request.notes_to_execute);
437 }
438}
439
440#[derive(Debug, Error)]
444pub enum NtxBuilderError {
445 #[error("account cache update error")]
446 AccountCacheUpdateFailed(#[from] AccountError),
447 #[error("store error")]
448 Store(#[from] StoreError),
449 #[error("transaction inputs error")]
450 TransactionInputError(#[from] TransactionInputError),
451 #[error("transaction execution error")]
452 ExecutionError(#[source] TransactionExecutorError),
453 #[error("error while checking for note consumption compatibility")]
454 NoteConsumptionCheckFailed(#[source] TransactionExecutorError),
455 #[error("after performing a consumption check for account, the note list became empty")]
456 NoteSetIsEmpty(AccountId),
457 #[error("block producer client error")]
458 BlockProducer(#[from] tonic::Status),
459 #[error("network account error")]
460 NetworkAccount(#[from] NetworkAccountError),
461 #[error("error while proving transaction")]
462 ProverError(#[from] TransactionProverError),
463 #[error("error while proving transaction")]
464 ProofSubmissionFailed(#[source] tonic::Status),
465}