miden_node_ntx_builder/
builder.rs1use std::pin::Pin;
2use std::sync::Arc;
3
4use anyhow::Context;
5use futures::Stream;
6use miden_node_proto::domain::account::NetworkAccountId;
7use miden_node_proto::domain::mempool::MempoolEvent;
8use miden_protocol::account::delta::AccountUpdateDetails;
9use miden_protocol::block::BlockHeader;
10use tokio::net::TcpListener;
11use tokio::sync::{RwLock, mpsc};
12use tokio::task::JoinSet;
13use tokio_stream::StreamExt;
14use tonic::Status;
15
16use crate::NtxBuilderConfig;
17use crate::actor::{AccountActorContext, AccountOrigin, ActorRequest};
18use crate::chain_state::ChainState;
19use crate::clients::StoreClient;
20use crate::coordinator::Coordinator;
21use crate::db::Db;
22use crate::server::NtxBuilderRpcServer;
23
24pub(crate) type MempoolEventStream =
32 Pin<Box<dyn Stream<Item = Result<MempoolEvent, Status>> + Send>>;
33
34pub struct NetworkTransactionBuilder {
45 config: NtxBuilderConfig,
47 coordinator: Coordinator,
49 store: StoreClient,
51 db: Db,
53 chain_state: Arc<RwLock<ChainState>>,
55 actor_context: AccountActorContext,
57 mempool_events: MempoolEventStream,
59 actor_request_rx: mpsc::Receiver<ActorRequest>,
64}
65
66impl NetworkTransactionBuilder {
67 #[expect(clippy::too_many_arguments)]
68 pub(crate) fn new(
69 config: NtxBuilderConfig,
70 coordinator: Coordinator,
71 store: StoreClient,
72 db: Db,
73 chain_state: Arc<RwLock<ChainState>>,
74 actor_context: AccountActorContext,
75 mempool_events: MempoolEventStream,
76 actor_request_rx: mpsc::Receiver<ActorRequest>,
77 ) -> Self {
78 Self {
79 config,
80 coordinator,
81 store,
82 db,
83 chain_state,
84 actor_context,
85 mempool_events,
86 actor_request_rx,
87 }
88 }
89
90 pub async fn run(self, listener: Option<TcpListener>) -> anyhow::Result<()> {
108 let mut join_set = JoinSet::new();
109
110 if let Some(listener) = listener {
112 let server = NtxBuilderRpcServer::new(self.db.clone());
113 join_set.spawn(async move {
114 server.serve(listener).await.context("ntx-builder gRPC server failed")
115 });
116 }
117
118 join_set.spawn(self.run_event_loop());
119
120 if let Some(result) = join_set.join_next().await {
123 result.context("ntx-builder task panicked")??;
124 }
125
126 Ok(())
127 }
128
129 async fn run_event_loop(mut self) -> anyhow::Result<()> {
131 let (account_tx, mut account_rx) =
134 mpsc::channel::<NetworkAccountId>(self.config.account_channel_capacity);
135 let account_loader_store = self.store.clone();
136 let mut account_loader_handle = tokio::spawn(async move {
137 account_loader_store
138 .stream_network_account_ids(account_tx)
139 .await
140 .context("failed to load network accounts from store")
141 });
142
143 loop {
145 tokio::select! {
146 result = self.coordinator.next() => {
148 if let Some(account_id) = result? {
149 self.coordinator
150 .spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
151 }
152 },
153 event = self.mempool_events.next() => {
155 let event = event
156 .context("mempool event stream ended")?
157 .context("mempool event stream failed")?;
158
159 self.handle_mempool_event(event).await?;
160 },
161 Some(account_id) = account_rx.recv() => {
165 self.handle_loaded_account(account_id).await?;
166 },
167 Some(request) = self.actor_request_rx.recv() => {
169 self.handle_actor_request(request).await?;
170 },
171 result = &mut account_loader_handle => {
175 result
176 .context("account loader task panicked")
177 .flatten()?;
178
179 tracing::info!("account loading from store completed");
180 account_loader_handle = tokio::spawn(std::future::pending());
181 },
182 }
183 }
184 }
185
186 #[tracing::instrument(name = "ntx.builder.handle_loaded_account", skip(self, account_id))]
188 async fn handle_loaded_account(
189 &mut self,
190 account_id: NetworkAccountId,
191 ) -> Result<(), anyhow::Error> {
192 let account = self
194 .store
195 .get_network_account(account_id)
196 .await
197 .context("failed to load account from store")?
198 .context("account should exist in store")?;
199
200 let block_num = self.chain_state.read().await.chain_tip_header.block_num();
201 let notes = self
202 .store
203 .get_unconsumed_network_notes(account_id, block_num.as_u32())
204 .await
205 .context("failed to load notes from store")?;
206
207 self.db
209 .sync_account_from_store(account_id, account.clone(), notes.clone())
210 .await
211 .context("failed to sync account to DB")?;
212
213 self.coordinator
214 .spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
215 Ok(())
216 }
217
218 #[tracing::instrument(name = "ntx.builder.handle_mempool_event", skip(self, event))]
220 async fn handle_mempool_event(&mut self, event: MempoolEvent) -> Result<(), anyhow::Error> {
221 match &event {
222 MempoolEvent::TransactionAdded { account_delta, .. } => {
223 self.coordinator
225 .write_event(&event)
226 .await
227 .context("failed to write TransactionAdded to DB")?;
228
229 if let Some(AccountUpdateDetails::Delta(delta)) = account_delta {
231 if let Some(network_account) = AccountOrigin::transaction(delta) {
233 let is_creating_account = delta.is_full_state();
235 if is_creating_account {
236 self.coordinator.spawn_actor(network_account, &self.actor_context);
237 }
238 }
239 }
240 let inactive_targets = self.coordinator.send_targeted(&event);
241 for account_id in inactive_targets {
242 self.coordinator
243 .spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
244 }
245 Ok(())
246 },
247 MempoolEvent::BlockCommitted { header, .. } => {
249 let result = self
251 .coordinator
252 .write_event(&event)
253 .await
254 .context("failed to write BlockCommitted to DB")?;
255
256 self.update_chain_tip(header.as_ref().clone()).await;
257 self.coordinator.notify_accounts(&result.accounts_to_notify);
258 Ok(())
259 },
260 MempoolEvent::TransactionsReverted(_) => {
263 let result = self
265 .coordinator
266 .write_event(&event)
267 .await
268 .context("failed to write TransactionsReverted to DB")?;
269
270 self.coordinator.notify_accounts(&result.accounts_to_notify);
271 Ok(())
272 },
273 }
274 }
275
276 async fn handle_actor_request(&mut self, request: ActorRequest) -> Result<(), anyhow::Error> {
278 match request {
279 ActorRequest::NotesFailed { failed_notes, block_num, ack_tx } => {
280 self.db
281 .notes_failed(failed_notes, block_num)
282 .await
283 .context("failed to mark notes as failed")?;
284 let _ = ack_tx.send(());
285 },
286 ActorRequest::CacheNoteScript { script_root, script } => {
287 self.db
288 .insert_note_script(script_root, &script)
289 .await
290 .context("failed to cache note script")?;
291 },
292 }
293 Ok(())
294 }
295
296 async fn update_chain_tip(&mut self, tip: BlockHeader) {
298 let mut chain_state = self.chain_state.write().await;
299
300 let mmr_tip = chain_state.chain_tip_header.clone();
302 Arc::make_mut(&mut chain_state.chain_mmr).add_block(&mmr_tip, true);
303
304 chain_state.chain_tip_header = tip;
306
307 let pruned_block_height = (chain_state
309 .chain_mmr
310 .chain_length()
311 .as_usize()
312 .saturating_sub(self.config.max_block_count)) as u32;
313 Arc::make_mut(&mut chain_state.chain_mmr).prune_to(..pruned_block_height.into());
314 }
315}