Skip to main content

miden_node_ntx_builder/
builder.rs

1use std::pin::Pin;
2use std::sync::Arc;
3
4use anyhow::Context;
5use futures::Stream;
6use miden_node_proto::domain::account::NetworkAccountId;
7use miden_node_proto::domain::mempool::MempoolEvent;
8use miden_protocol::account::delta::AccountUpdateDetails;
9use miden_protocol::block::BlockHeader;
10use tokio::net::TcpListener;
11use tokio::sync::{RwLock, mpsc};
12use tokio::task::JoinSet;
13use tokio_stream::StreamExt;
14use tonic::Status;
15
16use crate::NtxBuilderConfig;
17use crate::actor::{AccountActorContext, AccountOrigin, ActorRequest};
18use crate::chain_state::ChainState;
19use crate::clients::StoreClient;
20use crate::coordinator::Coordinator;
21use crate::db::Db;
22use crate::server::NtxBuilderRpcServer;
23
24// NETWORK TRANSACTION BUILDER
25// ================================================================================================
26
27/// A boxed, pinned stream of mempool events with a `'static` lifetime.
28///
29/// Boxing gives the stream a `'static` lifetime by ensuring it owns all its data, avoiding
30/// complex lifetime annotations that would otherwise be required when storing `impl TryStream`.
31pub(crate) type MempoolEventStream =
32    Pin<Box<dyn Stream<Item = Result<MempoolEvent, Status>> + Send>>;
33
34/// Network transaction builder component.
35///
36/// The network transaction builder is in charge of building transactions that consume notes
37/// against network accounts. These notes are identified and communicated by the block producer.
38/// The service maintains a list of unconsumed notes and periodically executes and proves
39/// transactions that consume them (reaching out to the store to retrieve state as necessary).
40///
41/// The builder manages the tasks for every network account on the chain through the coordinator.
42///
43/// Create an instance using [`NtxBuilderConfig::build()`].
44pub struct NetworkTransactionBuilder {
45    /// Configuration for the builder.
46    config: NtxBuilderConfig,
47    /// Coordinator for managing actor tasks.
48    coordinator: Coordinator,
49    /// Client for the store gRPC API.
50    store: StoreClient,
51    /// Database for persistent state.
52    db: Db,
53    /// Shared chain state updated by the event loop and read by actors.
54    chain_state: Arc<RwLock<ChainState>>,
55    /// Context shared with all account actors.
56    actor_context: AccountActorContext,
57    /// Stream of mempool events from the block producer.
58    mempool_events: MempoolEventStream,
59    /// Database update requests from account actors.
60    ///
61    /// We keep database writes centralized so this is how actors communicate
62    /// items to write.
63    actor_request_rx: mpsc::Receiver<ActorRequest>,
64}
65
66impl NetworkTransactionBuilder {
67    #[expect(clippy::too_many_arguments)]
68    pub(crate) fn new(
69        config: NtxBuilderConfig,
70        coordinator: Coordinator,
71        store: StoreClient,
72        db: Db,
73        chain_state: Arc<RwLock<ChainState>>,
74        actor_context: AccountActorContext,
75        mempool_events: MempoolEventStream,
76        actor_request_rx: mpsc::Receiver<ActorRequest>,
77    ) -> Self {
78        Self {
79            config,
80            coordinator,
81            store,
82            db,
83            chain_state,
84            actor_context,
85            mempool_events,
86            actor_request_rx,
87        }
88    }
89
90    /// Runs the network transaction builder event loop until a fatal error occurs.
91    ///
92    /// If a `TcpListener` is provided, a gRPC server is also spawned to expose the
93    /// `GetNoteError` endpoint.
94    ///
95    /// This method:
96    /// 1. Optionally starts a gRPC server for note error queries
97    /// 2. Spawns a background task to load existing network accounts from the store
98    /// 3. Runs the main event loop, processing mempool events and managing actors
99    ///
100    /// # Errors
101    ///
102    /// Returns an error if:
103    /// - The mempool event stream ends unexpectedly
104    /// - An actor encounters a fatal error
105    /// - The account loader task fails
106    /// - The gRPC server fails
107    pub async fn run(self, listener: Option<TcpListener>) -> anyhow::Result<()> {
108        let mut join_set = JoinSet::new();
109
110        // Start the gRPC server if a listener is provided.
111        if let Some(listener) = listener {
112            let server = NtxBuilderRpcServer::new(self.db.clone());
113            join_set.spawn(async move {
114                server.serve(listener).await.context("ntx-builder gRPC server failed")
115            });
116        }
117
118        join_set.spawn(self.run_event_loop());
119
120        // Wait for either the event loop or the gRPC server to complete.
121        // Any completion is treated as fatal.
122        if let Some(result) = join_set.join_next().await {
123            result.context("ntx-builder task panicked")??;
124        }
125
126        Ok(())
127    }
128
129    /// Runs the main event loop.
130    async fn run_event_loop(mut self) -> anyhow::Result<()> {
131        // Spawn a background task to load network accounts from the store.
132        // Accounts are sent through a channel and processed in the main event loop.
133        let (account_tx, mut account_rx) =
134            mpsc::channel::<NetworkAccountId>(self.config.account_channel_capacity);
135        let account_loader_store = self.store.clone();
136        let mut account_loader_handle = tokio::spawn(async move {
137            account_loader_store
138                .stream_network_account_ids(account_tx)
139                .await
140                .context("failed to load network accounts from store")
141        });
142
143        // Main event loop.
144        loop {
145            tokio::select! {
146                // Handle actor result. If a timed-out actor needs respawning, do so.
147                result = self.coordinator.next() => {
148                    if let Some(account_id) = result? {
149                        self.coordinator
150                            .spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
151                    }
152                },
153                // Handle mempool events.
154                event = self.mempool_events.next() => {
155                    let event = event
156                        .context("mempool event stream ended")?
157                        .context("mempool event stream failed")?;
158
159                    self.handle_mempool_event(event).await?;
160                },
161                // Handle account batches loaded from the store.
162                // Once all accounts are loaded, the channel closes and this branch
163                // becomes inactive (recv returns None and we stop matching).
164                Some(account_id) = account_rx.recv() => {
165                    self.handle_loaded_account(account_id).await?;
166                },
167                // Handle requests from actors.
168                Some(request) = self.actor_request_rx.recv() => {
169                    self.handle_actor_request(request).await?;
170                },
171                // Handle account loader task completion/failure.
172                // If the task fails, we abort since the builder would be in a degraded state
173                // where existing notes against network accounts won't be processed.
174                result = &mut account_loader_handle => {
175                    result
176                        .context("account loader task panicked")
177                        .flatten()?;
178
179                    tracing::info!("account loading from store completed");
180                    account_loader_handle = tokio::spawn(std::future::pending());
181                },
182            }
183        }
184    }
185
186    /// Handles account IDs loaded from the store by syncing state to DB and spawning actors.
187    #[tracing::instrument(name = "ntx.builder.handle_loaded_account", skip(self, account_id))]
188    async fn handle_loaded_account(
189        &mut self,
190        account_id: NetworkAccountId,
191    ) -> Result<(), anyhow::Error> {
192        // Fetch account from store and write to DB.
193        let account = self
194            .store
195            .get_network_account(account_id)
196            .await
197            .context("failed to load account from store")?
198            .context("account should exist in store")?;
199
200        let block_num = self.chain_state.read().await.chain_tip_header.block_num();
201        let notes = self
202            .store
203            .get_unconsumed_network_notes(account_id, block_num.as_u32())
204            .await
205            .context("failed to load notes from store")?;
206
207        // Write account and notes to DB.
208        self.db
209            .sync_account_from_store(account_id, account.clone(), notes.clone())
210            .await
211            .context("failed to sync account to DB")?;
212
213        self.coordinator
214            .spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
215        Ok(())
216    }
217
218    /// Handles mempool events by writing to DB first, then notifying actors.
219    #[tracing::instrument(name = "ntx.builder.handle_mempool_event", skip(self, event))]
220    async fn handle_mempool_event(&mut self, event: MempoolEvent) -> Result<(), anyhow::Error> {
221        match &event {
222            MempoolEvent::TransactionAdded { account_delta, .. } => {
223                // Write event effects to DB first.
224                self.coordinator
225                    .write_event(&event)
226                    .await
227                    .context("failed to write TransactionAdded to DB")?;
228
229                // Handle account deltas in case an account is being created.
230                if let Some(AccountUpdateDetails::Delta(delta)) = account_delta {
231                    // Handle account deltas for network accounts only.
232                    if let Some(network_account) = AccountOrigin::transaction(delta) {
233                        // Spawn new actors if a transaction creates a new network account.
234                        let is_creating_account = delta.is_full_state();
235                        if is_creating_account {
236                            self.coordinator.spawn_actor(network_account, &self.actor_context);
237                        }
238                    }
239                }
240                let inactive_targets = self.coordinator.send_targeted(&event);
241                for account_id in inactive_targets {
242                    self.coordinator
243                        .spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
244                }
245                Ok(())
246            },
247            // Update chain state and notify affected actors.
248            MempoolEvent::BlockCommitted { header, .. } => {
249                // Write event effects to DB first.
250                let result = self
251                    .coordinator
252                    .write_event(&event)
253                    .await
254                    .context("failed to write BlockCommitted to DB")?;
255
256                self.update_chain_tip(header.as_ref().clone()).await;
257                self.coordinator.notify_accounts(&result.accounts_to_notify);
258                Ok(())
259            },
260            // Notify affected actors (reverted account actors will self-cancel when they
261            // detect their account has been removed from the DB).
262            MempoolEvent::TransactionsReverted(_) => {
263                // Write event effects to DB first.
264                let result = self
265                    .coordinator
266                    .write_event(&event)
267                    .await
268                    .context("failed to write TransactionsReverted to DB")?;
269
270                self.coordinator.notify_accounts(&result.accounts_to_notify);
271                Ok(())
272            },
273        }
274    }
275
276    /// Processes a request from an account actor.
277    async fn handle_actor_request(&mut self, request: ActorRequest) -> Result<(), anyhow::Error> {
278        match request {
279            ActorRequest::NotesFailed { failed_notes, block_num, ack_tx } => {
280                self.db
281                    .notes_failed(failed_notes, block_num)
282                    .await
283                    .context("failed to mark notes as failed")?;
284                let _ = ack_tx.send(());
285            },
286            ActorRequest::CacheNoteScript { script_root, script } => {
287                self.db
288                    .insert_note_script(script_root, &script)
289                    .await
290                    .context("failed to cache note script")?;
291            },
292        }
293        Ok(())
294    }
295
296    /// Updates the chain tip and prunes old blocks from the MMR.
297    async fn update_chain_tip(&mut self, tip: BlockHeader) {
298        let mut chain_state = self.chain_state.write().await;
299
300        // Update MMR which lags by one block.
301        let mmr_tip = chain_state.chain_tip_header.clone();
302        Arc::make_mut(&mut chain_state.chain_mmr).add_block(&mmr_tip, true);
303
304        // Set the new tip.
305        chain_state.chain_tip_header = tip;
306
307        // Keep MMR pruned.
308        let pruned_block_height = (chain_state
309            .chain_mmr
310            .chain_length()
311            .as_usize()
312            .saturating_sub(self.config.max_block_count)) as u32;
313        Arc::make_mut(&mut chain_state.chain_mmr).prune_to(..pruned_block_height.into());
314    }
315}