miden_node_ntx_builder/
builder.rs1use std::num::NonZeroUsize;
2use std::sync::Arc;
3
4use anyhow::Context;
5use futures::TryStreamExt;
6use miden_node_proto::domain::account::NetworkAccountId;
7use miden_node_proto::domain::mempool::MempoolEvent;
8use miden_node_utils::lru_cache::LruCache;
9use miden_protocol::Word;
10use miden_protocol::account::delta::AccountUpdateDetails;
11use miden_protocol::block::BlockHeader;
12use miden_protocol::crypto::merkle::mmr::PartialMmr;
13use miden_protocol::note::NoteScript;
14use miden_protocol::transaction::PartialBlockchain;
15use miden_remote_prover_client::remote_prover::tx_prover::RemoteTransactionProver;
16use tokio::sync::{RwLock, mpsc};
17use url::Url;
18
19use crate::MAX_IN_PROGRESS_TXS;
20use crate::actor::{AccountActorContext, AccountOrigin};
21use crate::block_producer::BlockProducerClient;
22use crate::coordinator::Coordinator;
23use crate::store::StoreClient;
24use crate::validator::ValidatorClient;
25
26const MAX_BLOCK_COUNT: usize = 4;
31
32#[derive(Debug, Clone)]
38pub struct ChainState {
39 pub chain_tip_header: BlockHeader,
41 pub chain_mmr: PartialBlockchain,
43}
44
45impl ChainState {
46 fn new(chain_tip_header: BlockHeader, chain_mmr: PartialMmr) -> Self {
48 let chain_mmr = PartialBlockchain::new(chain_mmr, [])
49 .expect("partial blockchain should build from partial mmr");
50 Self { chain_tip_header, chain_mmr }
51 }
52
53 pub fn into_parts(self) -> (BlockHeader, PartialBlockchain) {
56 (self.chain_tip_header, self.chain_mmr)
57 }
58}
59
60pub struct NetworkTransactionBuilder {
72 store_url: Url,
74 block_producer_url: Url,
76 validator_url: Url,
78 tx_prover_url: Option<Url>,
81 script_cache: LruCache<Word, NoteScript>,
84 coordinator: Coordinator,
86}
87
88impl NetworkTransactionBuilder {
89 const ACCOUNT_CHANNEL_CAPACITY: usize = 1_000;
91
92 pub fn new(
94 store_url: Url,
95 block_producer_url: Url,
96 validator_url: Url,
97 tx_prover_url: Option<Url>,
98 script_cache_size: NonZeroUsize,
99 ) -> Self {
100 let script_cache = LruCache::new(script_cache_size);
101 let coordinator = Coordinator::new(MAX_IN_PROGRESS_TXS);
102 Self {
103 store_url,
104 block_producer_url,
105 validator_url,
106 tx_prover_url,
107 script_cache,
108 coordinator,
109 }
110 }
111
112 pub async fn run(mut self) -> anyhow::Result<()> {
114 let store = StoreClient::new(self.store_url.clone());
115 let block_producer = BlockProducerClient::new(self.block_producer_url.clone());
116
117 let (chain_tip_header, chain_mmr, mut mempool_events) = loop {
126 let (chain_tip_header, chain_mmr) = store
127 .get_latest_blockchain_data_with_retry()
128 .await?
129 .expect("store should contain a latest block");
130
131 match block_producer
132 .subscribe_to_mempool_with_retry(chain_tip_header.block_num())
133 .await
134 {
135 Ok(subscription) => break (chain_tip_header, chain_mmr, subscription),
136 Err(status) if status.code() == tonic::Code::InvalidArgument => {
137 tracing::error!(err=%status, "mempool subscription failed due to desync, trying again");
138 },
139 Err(err) => return Err(err).context("failed to subscribe to mempool events"),
140 }
141 };
142
143 let chain_state = Arc::new(RwLock::new(ChainState::new(chain_tip_header, chain_mmr)));
145
146 let validator = ValidatorClient::new(self.validator_url.clone());
147 let prover = self.tx_prover_url.clone().map(RemoteTransactionProver::new);
148
149 let actor_context = AccountActorContext {
150 block_producer: block_producer.clone(),
151 validator,
152 prover,
153 chain_state: chain_state.clone(),
154 store: store.clone(),
155 script_cache: self.script_cache.clone(),
156 };
157
158 let (account_tx, mut account_rx) =
161 mpsc::channel::<NetworkAccountId>(Self::ACCOUNT_CHANNEL_CAPACITY);
162 let account_loader_store = store.clone();
163 let mut account_loader_handle = tokio::spawn(async move {
164 account_loader_store
165 .stream_network_account_ids(account_tx)
166 .await
167 .context("failed to load network accounts from store")
168 });
169
170 loop {
172 tokio::select! {
173 result = self.coordinator.next() => {
175 result?;
176 },
177 event = mempool_events.try_next() => {
179 let event = event
180 .context("mempool event stream ended")?
181 .context("mempool event stream failed")?;
182
183 self.handle_mempool_event(
184 event.into(),
185 &actor_context,
186 chain_state.clone(),
187 ).await?;
188 },
189 Some(account_id) = account_rx.recv() => {
193 self.handle_loaded_account(account_id, &actor_context).await?;
194 },
195 result = &mut account_loader_handle => {
199 result
200 .context("account loader task panicked")
201 .flatten()?;
202
203 tracing::info!("account loading from store completed");
204 account_loader_handle = tokio::spawn(std::future::pending());
205 },
206 }
207 }
208 }
209
210 #[tracing::instrument(
212 name = "ntx.builder.handle_loaded_accounts",
213 skip(self, account_id, actor_context)
214 )]
215 async fn handle_loaded_account(
216 &mut self,
217 account_id: NetworkAccountId,
218 actor_context: &AccountActorContext,
219 ) -> Result<(), anyhow::Error> {
220 self.coordinator
221 .spawn_actor(AccountOrigin::store(account_id), actor_context)
222 .await?;
223 Ok(())
224 }
225
226 #[tracing::instrument(
229 name = "ntx.builder.handle_mempool_event",
230 skip(self, event, actor_context, chain_state)
231 )]
232 async fn handle_mempool_event(
233 &mut self,
234 event: Arc<MempoolEvent>,
235 actor_context: &AccountActorContext,
236 chain_state: Arc<RwLock<ChainState>>,
237 ) -> Result<(), anyhow::Error> {
238 match event.as_ref() {
239 MempoolEvent::TransactionAdded { account_delta, .. } => {
240 if let Some(AccountUpdateDetails::Delta(delta)) = account_delta {
242 if let Some(network_account) = AccountOrigin::transaction(delta) {
244 let is_creating_account = delta.is_full_state();
246 if is_creating_account {
247 self.coordinator.spawn_actor(network_account, actor_context).await?;
248 }
249 }
250 }
251 self.coordinator.send_targeted(&event).await?;
252 Ok(())
253 },
254 MempoolEvent::BlockCommitted { header, txs } => {
256 self.update_chain_tip(header.as_ref().clone(), chain_state).await;
257 self.coordinator.broadcast(event.clone()).await;
258
259 for tx_id in txs {
262 self.coordinator.drain_predating_events(tx_id);
263 }
264 Ok(())
265 },
266 MempoolEvent::TransactionsReverted(txs) => {
268 self.coordinator.broadcast(event.clone()).await;
269
270 for tx_id in txs {
272 self.coordinator.drain_predating_events(tx_id);
273 }
274 Ok(())
275 },
276 }
277 }
278
279 async fn update_chain_tip(&mut self, tip: BlockHeader, chain_state: Arc<RwLock<ChainState>>) {
283 let mut chain_state = chain_state.write().await;
285
286 let mmr_tip = chain_state.chain_tip_header.clone();
288 chain_state.chain_mmr.add_block(&mmr_tip, true);
289
290 chain_state.chain_tip_header = tip;
292
293 let pruned_block_height =
295 (chain_state.chain_mmr.chain_length().as_usize().saturating_sub(MAX_BLOCK_COUNT))
296 as u32;
297 chain_state.chain_mmr.prune_to(..pruned_block_height.into());
298 }
299}