Skip to main content

miden_node_ntx_builder/
builder.rs

1use std::num::NonZeroUsize;
2use std::sync::Arc;
3
4use anyhow::Context;
5use futures::TryStreamExt;
6use miden_node_proto::domain::account::NetworkAccountId;
7use miden_node_proto::domain::mempool::MempoolEvent;
8use miden_node_utils::lru_cache::LruCache;
9use miden_protocol::Word;
10use miden_protocol::account::delta::AccountUpdateDetails;
11use miden_protocol::block::BlockHeader;
12use miden_protocol::crypto::merkle::mmr::PartialMmr;
13use miden_protocol::note::NoteScript;
14use miden_protocol::transaction::PartialBlockchain;
15use tokio::sync::{RwLock, mpsc};
16use url::Url;
17
18use crate::MAX_IN_PROGRESS_TXS;
19use crate::actor::{AccountActorContext, AccountOrigin};
20use crate::block_producer::BlockProducerClient;
21use crate::coordinator::Coordinator;
22use crate::store::StoreClient;
23
24// CONSTANTS
25// =================================================================================================
26
27/// The maximum number of blocks to keep in memory while tracking the chain tip.
28const MAX_BLOCK_COUNT: usize = 4;
29
30// CHAIN STATE
31// ================================================================================================
32
33/// Contains information about the chain that is relevant to the [`NetworkTransactionBuilder`] and
34/// all account actors managed by the [`Coordinator`]
35#[derive(Debug, Clone)]
36pub struct ChainState {
37    /// The current tip of the chain.
38    pub chain_tip_header: BlockHeader,
39    /// A partial representation of the latest state of the chain.
40    pub chain_mmr: PartialBlockchain,
41}
42
43impl ChainState {
44    /// Constructs a new instance of [`ChainState`].
45    fn new(chain_tip_header: BlockHeader, chain_mmr: PartialMmr) -> Self {
46        let chain_mmr = PartialBlockchain::new(chain_mmr, [])
47            .expect("partial blockchain should build from partial mmr");
48        Self { chain_tip_header, chain_mmr }
49    }
50
51    /// Consumes the chain state and returns the chain tip header and the partial blockchain as a
52    /// tuple.
53    pub fn into_parts(self) -> (BlockHeader, PartialBlockchain) {
54        (self.chain_tip_header, self.chain_mmr)
55    }
56}
57
58// NETWORK TRANSACTION BUILDER
59// ================================================================================================
60
61/// Network transaction builder component.
62///
63/// The network transaction builder is in in charge of building transactions that consume notes
64/// against network accounts. These notes are identified and communicated by the block producer.
65/// The service maintains a list of unconsumed notes and periodically executes and proves
66/// transactions that consume them (reaching out to the store to retrieve state as necessary).
67///
68/// The builder manages the tasks for every network account on the chain through the coordinator.
69pub struct NetworkTransactionBuilder {
70    /// Address of the store gRPC server.
71    store_url: Url,
72    /// Address of the block producer gRPC server.
73    block_producer_url: Url,
74    /// Address of the Validator server.
75    validator_url: Url,
76    /// Address of the remote prover. If `None`, transactions will be proven locally, which is
77    /// undesirable due to the performance impact.
78    tx_prover_url: Option<Url>,
79    /// Shared LRU cache for storing retrieved note scripts to avoid repeated store calls.
80    /// This cache is shared across all account actors.
81    script_cache: LruCache<Word, NoteScript>,
82    /// Coordinator for managing actor tasks.
83    coordinator: Coordinator,
84}
85
86impl NetworkTransactionBuilder {
87    /// Channel capacity for account loading.
88    const ACCOUNT_CHANNEL_CAPACITY: usize = 1_000;
89
90    /// Creates a new instance of the network transaction builder.
91    pub fn new(
92        store_url: Url,
93        block_producer_url: Url,
94        validator_url: Url,
95        tx_prover_url: Option<Url>,
96        script_cache_size: NonZeroUsize,
97    ) -> Self {
98        let script_cache = LruCache::new(script_cache_size);
99        let coordinator = Coordinator::new(MAX_IN_PROGRESS_TXS);
100        Self {
101            store_url,
102            block_producer_url,
103            validator_url,
104            tx_prover_url,
105            script_cache,
106            coordinator,
107        }
108    }
109
110    /// Runs the network transaction builder until a fatal error occurs.
111    pub async fn run(mut self) -> anyhow::Result<()> {
112        let store = StoreClient::new(self.store_url.clone());
113        let block_producer = BlockProducerClient::new(self.block_producer_url.clone());
114
115        // Loop until we successfully subscribe.
116        //
117        // The mempool rejects our subscription if we don't have the same view of the chain aka
118        // if our chain tip does not match the mempools. This can occur if a new block is committed
119        // _after_ we fetch the chain tip from the store but _before_ our subscription request is
120        // handled.
121        //
122        // This is a hack-around for https://github.com/0xMiden/miden-node/issues/1566.
123        let (chain_tip_header, chain_mmr, mut mempool_events) = loop {
124            let (chain_tip_header, chain_mmr) = store
125                .get_latest_blockchain_data_with_retry()
126                .await?
127                .expect("store should contain a latest block");
128
129            match block_producer
130                .subscribe_to_mempool_with_retry(chain_tip_header.block_num())
131                .await
132            {
133                Ok(subscription) => break (chain_tip_header, chain_mmr, subscription),
134                Err(status) if status.code() == tonic::Code::InvalidArgument => {
135                    tracing::error!(err=%status, "mempool subscription failed due to desync, trying again");
136                },
137                Err(err) => return Err(err).context("failed to subscribe to mempool events"),
138            }
139        };
140
141        // Create chain state that will be updated by the coordinator and read by actors.
142        let chain_state = Arc::new(RwLock::new(ChainState::new(chain_tip_header, chain_mmr)));
143
144        let actor_context = AccountActorContext {
145            block_producer_url: self.block_producer_url.clone(),
146            validator_url: self.validator_url.clone(),
147            tx_prover_url: self.tx_prover_url.clone(),
148            chain_state: chain_state.clone(),
149            store: store.clone(),
150            script_cache: self.script_cache.clone(),
151        };
152
153        // Spawn a background task to load network accounts from the store.
154        // Accounts are sent through a channel in batches and processed in the main event loop.
155        let (account_tx, mut account_rx) =
156            mpsc::channel::<NetworkAccountId>(Self::ACCOUNT_CHANNEL_CAPACITY);
157        let account_loader_store = store.clone();
158        let mut account_loader_handle = tokio::spawn(async move {
159            account_loader_store
160                .stream_network_account_ids(account_tx)
161                .await
162                .context("failed to load network accounts from store")
163        });
164
165        // Main loop which manages actors and passes mempool events to them.
166        loop {
167            tokio::select! {
168                // Handle actor result.
169                result = self.coordinator.next() => {
170                    result?;
171                },
172                // Handle mempool events.
173                event = mempool_events.try_next() => {
174                    let event = event
175                        .context("mempool event stream ended")?
176                        .context("mempool event stream failed")?;
177
178                    self.handle_mempool_event(
179                        event.into(),
180                        &actor_context,
181                        chain_state.clone(),
182                    ).await?;
183                },
184                // Handle account batches loaded from the store.
185                // Once all accounts are loaded, the channel closes and this branch
186                // becomes inactive (recv returns None and we stop matching).
187                Some(account_id) = account_rx.recv() => {
188                    self.handle_loaded_account(account_id, &actor_context).await?;
189                },
190                // Handle account loader task completion/failure.
191                // If the task fails, we abort since the builder would be in a degraded state
192                // where existing notes against network accounts won't be processed.
193                result = &mut account_loader_handle => {
194                    result
195                        .context("account loader task panicked")
196                        .flatten()?;
197
198                    tracing::info!("account loading from store completed");
199                    account_loader_handle = tokio::spawn(std::future::pending());
200                },
201            }
202        }
203    }
204
205    /// Handles a batch of account IDs loaded from the store by spawning actors for them.
206    #[tracing::instrument(
207        name = "ntx.builder.handle_loaded_accounts",
208        skip(self, account_id, actor_context)
209    )]
210    async fn handle_loaded_account(
211        &mut self,
212        account_id: NetworkAccountId,
213        actor_context: &AccountActorContext,
214    ) -> Result<(), anyhow::Error> {
215        self.coordinator
216            .spawn_actor(AccountOrigin::store(account_id), actor_context)
217            .await?;
218        Ok(())
219    }
220
221    /// Handles mempool events by sending them to actors via the coordinator and/or spawning new
222    /// actors as required.
223    #[tracing::instrument(
224        name = "ntx.builder.handle_mempool_event",
225        skip(self, event, actor_context, chain_state)
226    )]
227    async fn handle_mempool_event(
228        &mut self,
229        event: Arc<MempoolEvent>,
230        actor_context: &AccountActorContext,
231        chain_state: Arc<RwLock<ChainState>>,
232    ) -> Result<(), anyhow::Error> {
233        match event.as_ref() {
234            MempoolEvent::TransactionAdded { account_delta, .. } => {
235                // Handle account deltas in case an account is being created.
236                if let Some(AccountUpdateDetails::Delta(delta)) = account_delta {
237                    // Handle account deltas for network accounts only.
238                    if let Some(network_account) = AccountOrigin::transaction(delta) {
239                        // Spawn new actors if a transaction creates a new network account
240                        let is_creating_account = delta.is_full_state();
241                        if is_creating_account {
242                            self.coordinator.spawn_actor(network_account, actor_context).await?;
243                        }
244                    }
245                }
246                self.coordinator.send_targeted(&event).await?;
247                Ok(())
248            },
249            // Update chain state and broadcast.
250            MempoolEvent::BlockCommitted { header, txs } => {
251                self.update_chain_tip(header.as_ref().clone(), chain_state).await;
252                self.coordinator.broadcast(event.clone()).await;
253
254                // All transactions pertaining to predating events should now be available through
255                // the store. So we can now drain them.
256                for tx_id in txs {
257                    self.coordinator.drain_predating_events(tx_id);
258                }
259                Ok(())
260            },
261            // Broadcast to all actors.
262            MempoolEvent::TransactionsReverted(txs) => {
263                self.coordinator.broadcast(event.clone()).await;
264
265                // Reverted predating transactions need not be processed.
266                for tx_id in txs {
267                    self.coordinator.drain_predating_events(tx_id);
268                }
269                Ok(())
270            },
271        }
272    }
273
274    /// Updates the chain tip and MMR block count.
275    ///
276    /// Blocks in the MMR are pruned if the block count exceeds the maximum.
277    async fn update_chain_tip(&mut self, tip: BlockHeader, chain_state: Arc<RwLock<ChainState>>) {
278        // Lock the chain state.
279        let mut chain_state = chain_state.write().await;
280
281        // Update MMR which lags by one block.
282        let mmr_tip = chain_state.chain_tip_header.clone();
283        chain_state.chain_mmr.add_block(&mmr_tip, true);
284
285        // Set the new tip.
286        chain_state.chain_tip_header = tip;
287
288        // Keep MMR pruned.
289        let pruned_block_height =
290            (chain_state.chain_mmr.chain_length().as_usize().saturating_sub(MAX_BLOCK_COUNT))
291                as u32;
292        chain_state.chain_mmr.prune_to(..pruned_block_height.into());
293    }
294}