miden_node_ntx_builder/builder/
mod.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::Context;
6use futures::TryStreamExt;
7use miden_node_proto::domain::account::NetworkAccountPrefix;
8use miden_node_utils::ErrorReport;
9use miden_remote_prover_client::remote_prover::tx_prover::RemoteTransactionProver;
10use tokio::sync::Barrier;
11use tokio::time;
12use url::Url;
13
14use crate::MAX_IN_PROGRESS_TXS;
15use crate::block_producer::BlockProducerClient;
16use crate::store::StoreClient;
17
18pub struct NetworkTransactionBuilder {
28 store_url: Url,
30 block_producer_url: Url,
32 tx_prover_url: Option<Url>,
35 ticker_interval: Duration,
37 bp_checkpoint: Arc<Barrier>,
42}
43
44impl NetworkTransactionBuilder {
45 pub fn new(
47 store_url: Url,
48 block_producer_url: Url,
49 tx_prover_url: Option<Url>,
50 ticker_interval: Duration,
51 bp_checkpoint: Arc<Barrier>,
52 ) -> Self {
53 Self {
54 store_url,
55 block_producer_url,
56 tx_prover_url,
57 ticker_interval,
58 bp_checkpoint,
59 }
60 }
61
62 pub async fn serve_new(self) -> anyhow::Result<()> {
63 let store = StoreClient::new(self.store_url);
64 let block_producer = BlockProducerClient::new(self.block_producer_url);
65
66 let mut state = crate::state::State::load(store.clone())
67 .await
68 .context("failed to load ntx state")?;
69
70 let mut mempool_events = block_producer
71 .subscribe_to_mempool_with_retry(state.chain_tip())
72 .await
73 .context("failed to subscribe to mempool events")?;
74
75 self.bp_checkpoint.wait().await;
80
81 let prover = self.tx_prover_url.map(RemoteTransactionProver::new);
82
83 let mut interval = tokio::time::interval(self.ticker_interval);
84 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
85
86 let mut inflight = JoinSet::new();
91 let mut inflight_idx = HashMap::new();
92
93 let context = crate::transaction::NtxContext {
94 block_producer: block_producer.clone(),
95 prover,
96 store,
97 };
98
99 loop {
100 tokio::select! {
101 _next = interval.tick() => {
102 if inflight.len() > MAX_IN_PROGRESS_TXS {
103 tracing::info!("At maximum network tx capacity, skipping");
104 continue;
105 }
106
107 let Some(candidate) = state.select_candidate(crate::MAX_NOTES_PER_TX) else {
108 tracing::debug!("No candidate network transaction available");
109 continue;
110 };
111
112 let network_account_prefix = NetworkAccountPrefix::try_from(candidate.account.id())
113 .expect("all accounts managed by NTB are network accounts");
114 let indexed_candidate = (network_account_prefix, candidate.chain_tip_header.block_num());
115 let task_id = inflight.spawn({
116 let context = context.clone();
117 context.execute_transaction(candidate)
118 }).id();
119
120 inflight_idx.insert(task_id, indexed_candidate);
122 },
123 event = mempool_events.try_next() => {
124 let event = event
125 .context("mempool event stream ended")?
126 .context("mempool event stream failed")?;
127 state.mempool_update(event).await.context("failed to update state")?;
128 },
129 completed = inflight.join_next_with_id() => {
130 let task_id = match &completed {
132 Ok((task_id, _)) => *task_id,
133 Err(join_handle) => join_handle.id(),
134 };
135 let (candidate, block_num) = inflight_idx.remove(&task_id).unwrap();
137
138 match completed {
139 Ok((_, Ok(failed))) => {
141 let notes = failed.into_iter().map(|note| note.note).collect::<Vec<_>>();
142 state.notes_failed(candidate, notes.as_slice(), block_num);
143 },
144 Ok((_, Err((notes, err)))) => {
146 tracing::warn!(err=err.as_report(), "network transaction failed");
147 state.notes_failed(candidate, notes.as_slice(), block_num);
149
150 state.candidate_failed(candidate);
151 },
152 Err(err) => {
154 tracing::warn!(err=err.as_report(), "network transaction panicked");
155 state.candidate_failed(candidate);
156 }
157 }
158 }
159 }
160 }
161 }
162}
163
164struct JoinSet<T>(tokio::task::JoinSet<T>);
169
170impl<T> JoinSet<T>
171where
172 T: 'static,
173{
174 fn new() -> Self {
175 Self(tokio::task::JoinSet::new())
176 }
177
178 fn spawn<F>(&mut self, task: F) -> tokio::task::AbortHandle
179 where
180 F: Future<Output = T>,
181 F: Send + 'static,
182 T: Send,
183 {
184 self.0.spawn(task)
185 }
186
187 async fn join_next_with_id(&mut self) -> Result<(tokio::task::Id, T), tokio::task::JoinError> {
188 if self.0.is_empty() {
189 std::future::pending().await
190 } else {
191 self.0.join_next_with_id().await.unwrap()
193 }
194 }
195
196 fn len(&self) -> usize {
197 self.0.len()
198 }
199}