miden_node_ntx_builder/builder/
mod.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::Context;
6use futures::TryStreamExt;
7use miden_node_proto::domain::account::NetworkAccountPrefix;
8use miden_node_utils::ErrorReport;
9use miden_remote_prover_client::remote_prover::tx_prover::RemoteTransactionProver;
10use tokio::sync::Barrier;
11use tokio::time;
12use url::Url;
13
14use crate::MAX_IN_PROGRESS_TXS;
15use crate::block_producer::BlockProducerClient;
16use crate::store::StoreClient;
17use crate::transaction::NtxError;
18
19pub struct NetworkTransactionBuilder {
29 store_url: Url,
31 block_producer_url: Url,
33 tx_prover_url: Option<Url>,
36 ticker_interval: Duration,
38 bp_checkpoint: Arc<Barrier>,
43}
44
45impl NetworkTransactionBuilder {
46 pub fn new(
48 store_url: Url,
49 block_producer_url: Url,
50 tx_prover_url: Option<Url>,
51 ticker_interval: Duration,
52 bp_checkpoint: Arc<Barrier>,
53 ) -> Self {
54 Self {
55 store_url,
56 block_producer_url,
57 tx_prover_url,
58 ticker_interval,
59 bp_checkpoint,
60 }
61 }
62
63 pub async fn serve_new(self) -> anyhow::Result<()> {
64 let store = StoreClient::new(self.store_url);
65 let block_producer = BlockProducerClient::new(self.block_producer_url);
66
67 let mut state = crate::state::State::load(store.clone())
68 .await
69 .context("failed to load ntx state")?;
70
71 let mut mempool_events = block_producer
72 .subscribe_to_mempool_with_retry(state.chain_tip())
73 .await
74 .context("failed to subscribe to mempool events")?;
75
76 self.bp_checkpoint.wait().await;
81
82 let prover = self.tx_prover_url.map(RemoteTransactionProver::new);
83
84 let mut interval = tokio::time::interval(self.ticker_interval);
85 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
86
87 let mut inflight = JoinSet::new();
92 let mut inflight_idx = HashMap::new();
93
94 let context = crate::transaction::NtxContext {
95 block_producer: block_producer.clone(),
96 prover,
97 store,
98 };
99
100 loop {
101 tokio::select! {
102 _next = interval.tick() => {
103 if inflight.len() > MAX_IN_PROGRESS_TXS {
104 tracing::info!("At maximum network tx capacity, skipping");
105 continue;
106 }
107
108 let Some(candidate) = state.select_candidate(crate::MAX_NOTES_PER_TX) else {
109 tracing::debug!("No candidate network transaction available");
110 continue;
111 };
112
113 let network_account_prefix = NetworkAccountPrefix::try_from(candidate.account.id())
114 .expect("all accounts managed by NTB are network accounts");
115 let indexed_candidate = (network_account_prefix, candidate.chain_tip_header.block_num());
116 let task_id = inflight.spawn({
117 let context = context.clone();
118 context.execute_transaction(candidate)
119 }).id();
120
121 inflight_idx.insert(task_id, indexed_candidate);
123 },
124 event = mempool_events.try_next() => {
125 let event = event
126 .context("mempool event stream ended")?
127 .context("mempool event stream failed")?;
128 state.mempool_update(event).await.context("failed to update state")?;
129 },
130 completed = inflight.join_next_with_id() => {
131 let task_id = match &completed {
133 Ok((task_id, _)) => *task_id,
134 Err(join_handle) => join_handle.id(),
135 };
136 let (candidate, block_num) = inflight_idx.remove(&task_id).unwrap();
138
139 match completed {
140 Ok((_, Ok(failed))) => {
142 let notes = failed.into_iter().map(|note| note.note).collect::<Vec<_>>();
143 state.notes_failed(candidate, notes.as_slice(), block_num);
144 },
145 Ok((_, Err(err))) => {
147 tracing::warn!(err=err.as_report(), "network transaction failed");
148 match err {
149 NtxError::AllNotesFailed(failed) => {
150 let notes = failed.into_iter().map(|note| note.note).collect::<Vec<_>>();
151 state.notes_failed(candidate, notes.as_slice(), block_num);
152 },
153 NtxError::InputNotes(_)
154 | NtxError::NoteFilter(_)
155 | NtxError::Execution(_)
156 | NtxError::Proving(_)
157 | NtxError::Submission(_)
158 | NtxError::Panic(_) => {},
159 }
160 state.candidate_failed(candidate);
161 },
162 Err(err) => {
164 tracing::warn!(err=err.as_report(), "network transaction panicked");
165 state.candidate_failed(candidate);
166 }
167 }
168 }
169 }
170 }
171 }
172}
173
174struct JoinSet<T>(tokio::task::JoinSet<T>);
179
180impl<T> JoinSet<T>
181where
182 T: 'static,
183{
184 fn new() -> Self {
185 Self(tokio::task::JoinSet::new())
186 }
187
188 fn spawn<F>(&mut self, task: F) -> tokio::task::AbortHandle
189 where
190 F: Future<Output = T>,
191 F: Send + 'static,
192 T: Send,
193 {
194 self.0.spawn(task)
195 }
196
197 async fn join_next_with_id(&mut self) -> Result<(tokio::task::Id, T), tokio::task::JoinError> {
198 if self.0.is_empty() {
199 std::future::pending().await
200 } else {
201 self.0.join_next_with_id().await.unwrap()
203 }
204 }
205
206 fn len(&self) -> usize {
207 self.0.len()
208 }
209}