miden_node_ntx_builder/builder/
mod.rs1use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
2
3use anyhow::Context;
4use futures::TryStreamExt;
5use miden_node_proto::domain::account::NetworkAccountPrefix;
6use miden_node_utils::ErrorReport;
7use miden_remote_prover_client::remote_prover::tx_prover::RemoteTransactionProver;
8use tokio::{sync::Barrier, time};
9use url::Url;
10
11use crate::{MAX_IN_PROGRESS_TXS, block_producer::BlockProducerClient, store::StoreClient};
12
13pub struct NetworkTransactionBuilder {
23 pub store_url: Url,
25 pub block_producer_address: SocketAddr,
27 pub tx_prover_url: Option<Url>,
30 pub ticker_interval: Duration,
32 pub bp_checkpoint: Arc<Barrier>,
37}
38
39impl NetworkTransactionBuilder {
40 pub async fn serve_new(self) -> anyhow::Result<()> {
41 let store = StoreClient::new(&self.store_url);
42 let block_producer = BlockProducerClient::new(self.block_producer_address);
43
44 let genesis_header = store
46 .genesis_header_with_retry()
47 .await
48 .context("failed to fetch genesis header")?;
49
50 let mut state = crate::state::State::load(store.clone())
51 .await
52 .context("failed to load ntx state")?;
53
54 let (chain_tip, _mmr) = store
55 .get_current_blockchain_data(None)
56 .await
57 .context("failed to fetch the chain tip data from the store")?
58 .context("chain tip data was None")?;
59
60 let mut mempool_events = block_producer
61 .subscribe_to_mempool_with_retry(chain_tip.block_num())
62 .await
63 .context("failed to subscribe to mempool events")?;
64
65 self.bp_checkpoint.wait().await;
70
71 let prover = self.tx_prover_url.map(RemoteTransactionProver::new);
72
73 let mut interval = tokio::time::interval(self.ticker_interval);
74 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
75
76 let mut inflight = JoinSet::new();
81 let mut inflight_idx = HashMap::new();
82
83 let context = crate::transaction::NtxContext {
84 block_producer: block_producer.clone(),
85 genesis_header,
86 prover,
87 };
88
89 loop {
90 tokio::select! {
91 _next = interval.tick() => {
92 if inflight.len() > MAX_IN_PROGRESS_TXS {
93 tracing::info!("At maximum network tx capacity, skipping");
94 continue;
95 }
96
97 let Some(candidate) = state.select_candidate(crate::MAX_NOTES_PER_TX) else {
98 tracing::debug!("No candidate network transaction available");
99 continue;
100 };
101
102 let prefix = NetworkAccountPrefix::try_from(candidate.account.id()).unwrap();
103 let task_id = inflight.spawn({
104 let context = context.clone();
105 context.execute_transaction(candidate)
106 }).id();
107
108 inflight_idx.insert(task_id, prefix);
110 },
111 event = mempool_events.try_next() => {
112 let event = event
113 .context("mempool event stream ended")?
114 .context("mempool event stream failed")?;
115 state.mempool_update(event).await.context("failed to update state")?;
116 },
117 completed = inflight.join_next_with_id() => {
118 let task_id = match &completed {
120 Ok((task_id, _)) => *task_id,
121 Err(join_handle) => join_handle.id(),
122 };
123 let candidate = inflight_idx.remove(&task_id).unwrap();
125
126 match completed {
127 Ok((_, Ok(_))) => {},
129 Ok((_, Err(err))) => {
131 tracing::warn!(err=err.as_report(), "network transaction failed");
132 state.candidate_failed(candidate);
133 },
134 Err(err) => {
135 tracing::warn!(err=err.as_report(), "network transaction panic'd");
136 state.candidate_failed(candidate);
137 }
138 }
139 }
140 }
141 }
142 }
143}
144
145struct JoinSet<T>(tokio::task::JoinSet<T>);
150
151impl<T> JoinSet<T>
152where
153 T: 'static,
154{
155 fn new() -> Self {
156 Self(tokio::task::JoinSet::new())
157 }
158
159 fn spawn<F>(&mut self, task: F) -> tokio::task::AbortHandle
160 where
161 F: Future<Output = T>,
162 F: Send + 'static,
163 T: Send,
164 {
165 self.0.spawn(task)
166 }
167
168 async fn join_next_with_id(&mut self) -> Result<(tokio::task::Id, T), tokio::task::JoinError> {
169 if self.0.is_empty() {
170 std::future::pending().await
171 } else {
172 self.0.join_next_with_id().await.unwrap()
174 }
175 }
176
177 fn len(&self) -> usize {
178 self.0.len()
179 }
180}