Skip to main content

miden_node_ntx_builder/
builder.rs

1use std::num::NonZeroUsize;
2use std::sync::Arc;
3
4use anyhow::Context;
5use futures::TryStreamExt;
6use miden_node_proto::domain::account::NetworkAccountId;
7use miden_node_proto::domain::mempool::MempoolEvent;
8use miden_node_utils::lru_cache::LruCache;
9use miden_protocol::Word;
10use miden_protocol::account::delta::AccountUpdateDetails;
11use miden_protocol::block::BlockHeader;
12use miden_protocol::crypto::merkle::mmr::PartialMmr;
13use miden_protocol::note::NoteScript;
14use miden_protocol::transaction::PartialBlockchain;
15use miden_remote_prover_client::remote_prover::tx_prover::RemoteTransactionProver;
16use tokio::sync::{RwLock, mpsc};
17use url::Url;
18
19use crate::MAX_IN_PROGRESS_TXS;
20use crate::actor::{AccountActorContext, AccountOrigin};
21use crate::block_producer::BlockProducerClient;
22use crate::coordinator::Coordinator;
23use crate::store::StoreClient;
24use crate::validator::ValidatorClient;
25
26// CONSTANTS
27// =================================================================================================
28
29/// The maximum number of blocks to keep in memory while tracking the chain tip.
30const MAX_BLOCK_COUNT: usize = 4;
31
32// CHAIN STATE
33// ================================================================================================
34
35/// Contains information about the chain that is relevant to the [`NetworkTransactionBuilder`] and
36/// all account actors managed by the [`Coordinator`]
37#[derive(Debug, Clone)]
38pub struct ChainState {
39    /// The current tip of the chain.
40    pub chain_tip_header: BlockHeader,
41    /// A partial representation of the latest state of the chain.
42    pub chain_mmr: PartialBlockchain,
43}
44
45impl ChainState {
46    /// Constructs a new instance of [`ChainState`].
47    fn new(chain_tip_header: BlockHeader, chain_mmr: PartialMmr) -> Self {
48        let chain_mmr = PartialBlockchain::new(chain_mmr, [])
49            .expect("partial blockchain should build from partial mmr");
50        Self { chain_tip_header, chain_mmr }
51    }
52
53    /// Consumes the chain state and returns the chain tip header and the partial blockchain as a
54    /// tuple.
55    pub fn into_parts(self) -> (BlockHeader, PartialBlockchain) {
56        (self.chain_tip_header, self.chain_mmr)
57    }
58}
59
60// NETWORK TRANSACTION BUILDER
61// ================================================================================================
62
63/// Network transaction builder component.
64///
65/// The network transaction builder is in in charge of building transactions that consume notes
66/// against network accounts. These notes are identified and communicated by the block producer.
67/// The service maintains a list of unconsumed notes and periodically executes and proves
68/// transactions that consume them (reaching out to the store to retrieve state as necessary).
69///
70/// The builder manages the tasks for every network account on the chain through the coordinator.
71pub struct NetworkTransactionBuilder {
72    /// Address of the store gRPC server.
73    store_url: Url,
74    /// Address of the block producer gRPC server.
75    block_producer_url: Url,
76    /// Address of the Validator server.
77    validator_url: Url,
78    /// Address of the remote prover. If `None`, transactions will be proven locally, which is
79    /// undesirable due to the performance impact.
80    tx_prover_url: Option<Url>,
81    /// Shared LRU cache for storing retrieved note scripts to avoid repeated store calls.
82    /// This cache is shared across all account actors.
83    script_cache: LruCache<Word, NoteScript>,
84    /// Coordinator for managing actor tasks.
85    coordinator: Coordinator,
86}
87
88impl NetworkTransactionBuilder {
89    /// Channel capacity for account loading.
90    const ACCOUNT_CHANNEL_CAPACITY: usize = 1_000;
91
92    /// Creates a new instance of the network transaction builder.
93    pub fn new(
94        store_url: Url,
95        block_producer_url: Url,
96        validator_url: Url,
97        tx_prover_url: Option<Url>,
98        script_cache_size: NonZeroUsize,
99    ) -> Self {
100        let script_cache = LruCache::new(script_cache_size);
101        let coordinator = Coordinator::new(MAX_IN_PROGRESS_TXS);
102        Self {
103            store_url,
104            block_producer_url,
105            validator_url,
106            tx_prover_url,
107            script_cache,
108            coordinator,
109        }
110    }
111
112    /// Runs the network transaction builder until a fatal error occurs.
113    pub async fn run(mut self) -> anyhow::Result<()> {
114        let store = StoreClient::new(self.store_url.clone());
115        let block_producer = BlockProducerClient::new(self.block_producer_url.clone());
116
117        // Loop until we successfully subscribe.
118        //
119        // The mempool rejects our subscription if we don't have the same view of the chain aka
120        // if our chain tip does not match the mempools. This can occur if a new block is committed
121        // _after_ we fetch the chain tip from the store but _before_ our subscription request is
122        // handled.
123        //
124        // This is a hack-around for https://github.com/0xMiden/miden-node/issues/1566.
125        let (chain_tip_header, chain_mmr, mut mempool_events) = loop {
126            let (chain_tip_header, chain_mmr) = store
127                .get_latest_blockchain_data_with_retry()
128                .await?
129                .expect("store should contain a latest block");
130
131            match block_producer
132                .subscribe_to_mempool_with_retry(chain_tip_header.block_num())
133                .await
134            {
135                Ok(subscription) => break (chain_tip_header, chain_mmr, subscription),
136                Err(status) if status.code() == tonic::Code::InvalidArgument => {
137                    tracing::error!(err=%status, "mempool subscription failed due to desync, trying again");
138                },
139                Err(err) => return Err(err).context("failed to subscribe to mempool events"),
140            }
141        };
142
143        // Create chain state that will be updated by the coordinator and read by actors.
144        let chain_state = Arc::new(RwLock::new(ChainState::new(chain_tip_header, chain_mmr)));
145
146        let validator = ValidatorClient::new(self.validator_url.clone());
147        let prover = self.tx_prover_url.clone().map(RemoteTransactionProver::new);
148
149        let actor_context = AccountActorContext {
150            block_producer: block_producer.clone(),
151            validator,
152            prover,
153            chain_state: chain_state.clone(),
154            store: store.clone(),
155            script_cache: self.script_cache.clone(),
156        };
157
158        // Spawn a background task to load network accounts from the store.
159        // Accounts are sent through a channel in batches and processed in the main event loop.
160        let (account_tx, mut account_rx) =
161            mpsc::channel::<NetworkAccountId>(Self::ACCOUNT_CHANNEL_CAPACITY);
162        let account_loader_store = store.clone();
163        let mut account_loader_handle = tokio::spawn(async move {
164            account_loader_store
165                .stream_network_account_ids(account_tx)
166                .await
167                .context("failed to load network accounts from store")
168        });
169
170        // Main loop which manages actors and passes mempool events to them.
171        loop {
172            tokio::select! {
173                // Handle actor result.
174                result = self.coordinator.next() => {
175                    result?;
176                },
177                // Handle mempool events.
178                event = mempool_events.try_next() => {
179                    let event = event
180                        .context("mempool event stream ended")?
181                        .context("mempool event stream failed")?;
182
183                    self.handle_mempool_event(
184                        event.into(),
185                        &actor_context,
186                        chain_state.clone(),
187                    ).await?;
188                },
189                // Handle account batches loaded from the store.
190                // Once all accounts are loaded, the channel closes and this branch
191                // becomes inactive (recv returns None and we stop matching).
192                Some(account_id) = account_rx.recv() => {
193                    self.handle_loaded_account(account_id, &actor_context).await?;
194                },
195                // Handle account loader task completion/failure.
196                // If the task fails, we abort since the builder would be in a degraded state
197                // where existing notes against network accounts won't be processed.
198                result = &mut account_loader_handle => {
199                    result
200                        .context("account loader task panicked")
201                        .flatten()?;
202
203                    tracing::info!("account loading from store completed");
204                    account_loader_handle = tokio::spawn(std::future::pending());
205                },
206            }
207        }
208    }
209
210    /// Handles a batch of account IDs loaded from the store by spawning actors for them.
211    #[tracing::instrument(
212        name = "ntx.builder.handle_loaded_accounts",
213        skip(self, account_id, actor_context)
214    )]
215    async fn handle_loaded_account(
216        &mut self,
217        account_id: NetworkAccountId,
218        actor_context: &AccountActorContext,
219    ) -> Result<(), anyhow::Error> {
220        self.coordinator
221            .spawn_actor(AccountOrigin::store(account_id), actor_context)
222            .await?;
223        Ok(())
224    }
225
226    /// Handles mempool events by sending them to actors via the coordinator and/or spawning new
227    /// actors as required.
228    #[tracing::instrument(
229        name = "ntx.builder.handle_mempool_event",
230        skip(self, event, actor_context, chain_state)
231    )]
232    async fn handle_mempool_event(
233        &mut self,
234        event: Arc<MempoolEvent>,
235        actor_context: &AccountActorContext,
236        chain_state: Arc<RwLock<ChainState>>,
237    ) -> Result<(), anyhow::Error> {
238        match event.as_ref() {
239            MempoolEvent::TransactionAdded { account_delta, .. } => {
240                // Handle account deltas in case an account is being created.
241                if let Some(AccountUpdateDetails::Delta(delta)) = account_delta {
242                    // Handle account deltas for network accounts only.
243                    if let Some(network_account) = AccountOrigin::transaction(delta) {
244                        // Spawn new actors if a transaction creates a new network account
245                        let is_creating_account = delta.is_full_state();
246                        if is_creating_account {
247                            self.coordinator.spawn_actor(network_account, actor_context).await?;
248                        }
249                    }
250                }
251                self.coordinator.send_targeted(&event).await?;
252                Ok(())
253            },
254            // Update chain state and broadcast.
255            MempoolEvent::BlockCommitted { header, txs } => {
256                self.update_chain_tip(header.as_ref().clone(), chain_state).await;
257                self.coordinator.broadcast(event.clone()).await;
258
259                // All transactions pertaining to predating events should now be available through
260                // the store. So we can now drain them.
261                for tx_id in txs {
262                    self.coordinator.drain_predating_events(tx_id);
263                }
264                Ok(())
265            },
266            // Broadcast to all actors.
267            MempoolEvent::TransactionsReverted(txs) => {
268                self.coordinator.broadcast(event.clone()).await;
269
270                // Reverted predating transactions need not be processed.
271                for tx_id in txs {
272                    self.coordinator.drain_predating_events(tx_id);
273                }
274                Ok(())
275            },
276        }
277    }
278
279    /// Updates the chain tip and MMR block count.
280    ///
281    /// Blocks in the MMR are pruned if the block count exceeds the maximum.
282    async fn update_chain_tip(&mut self, tip: BlockHeader, chain_state: Arc<RwLock<ChainState>>) {
283        // Lock the chain state.
284        let mut chain_state = chain_state.write().await;
285
286        // Update MMR which lags by one block.
287        let mmr_tip = chain_state.chain_tip_header.clone();
288        chain_state.chain_mmr.add_block(&mmr_tip, true);
289
290        // Set the new tip.
291        chain_state.chain_tip_header = tip;
292
293        // Keep MMR pruned.
294        let pruned_block_height =
295            (chain_state.chain_mmr.chain_length().as_usize().saturating_sub(MAX_BLOCK_COUNT))
296                as u32;
297        chain_state.chain_mmr.prune_to(..pruned_block_height.into());
298    }
299}