miden_node_ntx_builder/
builder.rs1use std::num::NonZeroUsize;
2use std::sync::Arc;
3
4use anyhow::Context;
5use futures::TryStreamExt;
6use miden_node_proto::domain::account::NetworkAccountId;
7use miden_node_proto::domain::mempool::MempoolEvent;
8use miden_node_utils::lru_cache::LruCache;
9use miden_protocol::Word;
10use miden_protocol::account::delta::AccountUpdateDetails;
11use miden_protocol::block::BlockHeader;
12use miden_protocol::crypto::merkle::mmr::PartialMmr;
13use miden_protocol::note::NoteScript;
14use miden_protocol::transaction::PartialBlockchain;
15use tokio::sync::{RwLock, mpsc};
16use url::Url;
17
18use crate::MAX_IN_PROGRESS_TXS;
19use crate::actor::{AccountActorContext, AccountOrigin};
20use crate::block_producer::BlockProducerClient;
21use crate::coordinator::Coordinator;
22use crate::store::StoreClient;
23
24const MAX_BLOCK_COUNT: usize = 4;
29
30#[derive(Debug, Clone)]
36pub struct ChainState {
37 pub chain_tip_header: BlockHeader,
39 pub chain_mmr: PartialBlockchain,
41}
42
43impl ChainState {
44 fn new(chain_tip_header: BlockHeader, chain_mmr: PartialMmr) -> Self {
46 let chain_mmr = PartialBlockchain::new(chain_mmr, [])
47 .expect("partial blockchain should build from partial mmr");
48 Self { chain_tip_header, chain_mmr }
49 }
50
51 pub fn into_parts(self) -> (BlockHeader, PartialBlockchain) {
54 (self.chain_tip_header, self.chain_mmr)
55 }
56}
57
58pub struct NetworkTransactionBuilder {
70 store_url: Url,
72 block_producer_url: Url,
74 validator_url: Url,
76 tx_prover_url: Option<Url>,
79 script_cache: LruCache<Word, NoteScript>,
82 coordinator: Coordinator,
84}
85
86impl NetworkTransactionBuilder {
87 const ACCOUNT_CHANNEL_CAPACITY: usize = 1_000;
89
90 pub fn new(
92 store_url: Url,
93 block_producer_url: Url,
94 validator_url: Url,
95 tx_prover_url: Option<Url>,
96 script_cache_size: NonZeroUsize,
97 ) -> Self {
98 let script_cache = LruCache::new(script_cache_size);
99 let coordinator = Coordinator::new(MAX_IN_PROGRESS_TXS);
100 Self {
101 store_url,
102 block_producer_url,
103 validator_url,
104 tx_prover_url,
105 script_cache,
106 coordinator,
107 }
108 }
109
110 pub async fn run(mut self) -> anyhow::Result<()> {
112 let store = StoreClient::new(self.store_url.clone());
113 let block_producer = BlockProducerClient::new(self.block_producer_url.clone());
114
115 let (chain_tip_header, chain_mmr, mut mempool_events) = loop {
124 let (chain_tip_header, chain_mmr) = store
125 .get_latest_blockchain_data_with_retry()
126 .await?
127 .expect("store should contain a latest block");
128
129 match block_producer
130 .subscribe_to_mempool_with_retry(chain_tip_header.block_num())
131 .await
132 {
133 Ok(subscription) => break (chain_tip_header, chain_mmr, subscription),
134 Err(status) if status.code() == tonic::Code::InvalidArgument => {
135 tracing::error!(err=%status, "mempool subscription failed due to desync, trying again");
136 },
137 Err(err) => return Err(err).context("failed to subscribe to mempool events"),
138 }
139 };
140
141 let chain_state = Arc::new(RwLock::new(ChainState::new(chain_tip_header, chain_mmr)));
143
144 let actor_context = AccountActorContext {
145 block_producer_url: self.block_producer_url.clone(),
146 validator_url: self.validator_url.clone(),
147 tx_prover_url: self.tx_prover_url.clone(),
148 chain_state: chain_state.clone(),
149 store: store.clone(),
150 script_cache: self.script_cache.clone(),
151 };
152
153 let (account_tx, mut account_rx) =
156 mpsc::channel::<NetworkAccountId>(Self::ACCOUNT_CHANNEL_CAPACITY);
157 let account_loader_store = store.clone();
158 let mut account_loader_handle = tokio::spawn(async move {
159 account_loader_store
160 .stream_network_account_ids(account_tx)
161 .await
162 .context("failed to load network accounts from store")
163 });
164
165 loop {
167 tokio::select! {
168 result = self.coordinator.next() => {
170 result?;
171 },
172 event = mempool_events.try_next() => {
174 let event = event
175 .context("mempool event stream ended")?
176 .context("mempool event stream failed")?;
177
178 self.handle_mempool_event(
179 event.into(),
180 &actor_context,
181 chain_state.clone(),
182 ).await?;
183 },
184 Some(account_id) = account_rx.recv() => {
188 self.handle_loaded_account(account_id, &actor_context).await?;
189 },
190 result = &mut account_loader_handle => {
194 result
195 .context("account loader task panicked")
196 .flatten()?;
197
198 tracing::info!("account loading from store completed");
199 account_loader_handle = tokio::spawn(std::future::pending());
200 },
201 }
202 }
203 }
204
205 #[tracing::instrument(
207 name = "ntx.builder.handle_loaded_accounts",
208 skip(self, account_id, actor_context)
209 )]
210 async fn handle_loaded_account(
211 &mut self,
212 account_id: NetworkAccountId,
213 actor_context: &AccountActorContext,
214 ) -> Result<(), anyhow::Error> {
215 self.coordinator
216 .spawn_actor(AccountOrigin::store(account_id), actor_context)
217 .await?;
218 Ok(())
219 }
220
221 #[tracing::instrument(
224 name = "ntx.builder.handle_mempool_event",
225 skip(self, event, actor_context, chain_state)
226 )]
227 async fn handle_mempool_event(
228 &mut self,
229 event: Arc<MempoolEvent>,
230 actor_context: &AccountActorContext,
231 chain_state: Arc<RwLock<ChainState>>,
232 ) -> Result<(), anyhow::Error> {
233 match event.as_ref() {
234 MempoolEvent::TransactionAdded { account_delta, .. } => {
235 if let Some(AccountUpdateDetails::Delta(delta)) = account_delta {
237 if let Some(network_account) = AccountOrigin::transaction(delta) {
239 let is_creating_account = delta.is_full_state();
241 if is_creating_account {
242 self.coordinator.spawn_actor(network_account, actor_context).await?;
243 }
244 }
245 }
246 self.coordinator.send_targeted(&event).await?;
247 Ok(())
248 },
249 MempoolEvent::BlockCommitted { header, txs } => {
251 self.update_chain_tip(header.as_ref().clone(), chain_state).await;
252 self.coordinator.broadcast(event.clone()).await;
253
254 for tx_id in txs {
257 self.coordinator.drain_predating_events(tx_id);
258 }
259 Ok(())
260 },
261 MempoolEvent::TransactionsReverted(txs) => {
263 self.coordinator.broadcast(event.clone()).await;
264
265 for tx_id in txs {
267 self.coordinator.drain_predating_events(tx_id);
268 }
269 Ok(())
270 },
271 }
272 }
273
274 async fn update_chain_tip(&mut self, tip: BlockHeader, chain_state: Arc<RwLock<ChainState>>) {
278 let mut chain_state = chain_state.write().await;
280
281 let mmr_tip = chain_state.chain_tip_header.clone();
283 chain_state.chain_mmr.add_block(&mmr_tip, true);
284
285 chain_state.chain_tip_header = tip;
287
288 let pruned_block_height =
290 (chain_state.chain_mmr.chain_length().as_usize().saturating_sub(MAX_BLOCK_COUNT))
291 as u32;
292 chain_state.chain_mmr.prune_to(..pruned_block_height.into());
293 }
294}