miden_ntx_builder/
builder.rs1use std::pin::Pin;
2use std::sync::Arc;
3
4use anyhow::Context;
5use futures::Stream;
6use miden_node_utils::tasks::Tasks;
7use miden_protocol::block::{BlockNumber, SignedBlock};
8use tokio::net::TcpListener;
9use tokio::sync::mpsc;
10use tokio_stream::StreamExt;
11
12use crate::NtxBuilderConfig;
13use crate::actor::ActorRequest;
14use crate::chain_state::SharedChainState;
15use crate::clients::RpcError;
16use crate::committed_block::CommittedBlockEffects;
17use crate::coordinator::Coordinator;
18use crate::db::{Db, LoopDb};
19use crate::server::NtxBuilderRpcServer;
20
21enum SteadyStateAction {
25 Block(Box<Option<Result<(SignedBlock, BlockNumber), RpcError>>>),
26 Request(Option<ActorRequest>),
27 Respawn(Option<miden_protocol::account::AccountId>),
28}
29
30pub(crate) type BlockStream =
39 Pin<Box<dyn Stream<Item = Result<(SignedBlock, BlockNumber), RpcError>> + Send>>;
40
41pub struct NetworkTransactionBuilder {
54 config: NtxBuilderConfig,
56 db: Db,
58 block_stream: BlockStream,
60 last_applied_block: BlockNumber,
62 chain: Arc<SharedChainState>,
64 coordinator: Coordinator,
66 actor_request_rx: mpsc::Receiver<ActorRequest>,
69 is_synced: bool,
72}
73
74impl NetworkTransactionBuilder {
75 pub(crate) fn new(
76 config: NtxBuilderConfig,
77 db: Db,
78 block_stream: BlockStream,
79 last_applied_block: BlockNumber,
80 chain: Arc<SharedChainState>,
81 coordinator: Coordinator,
82 actor_request_rx: mpsc::Receiver<ActorRequest>,
83 ) -> Self {
84 Self {
85 config,
86 db,
87 block_stream,
88 last_applied_block,
89 chain,
90 coordinator,
91 actor_request_rx,
92 is_synced: false,
93 }
94 }
95
96 pub fn is_synced(&self) -> bool {
99 self.is_synced
100 }
101
102 pub async fn run(self, listener: TcpListener) -> anyhow::Result<()> {
104 let mut tasks = Tasks::new();
105
106 let server = NtxBuilderRpcServer::new(self.db.clone(), self.config.max_note_attempts);
108 tasks.spawn("grpc-server", async move {
109 server.serve(listener).await.context("ntx-builder gRPC server failed")
110 });
111
112 tasks.spawn("event-loop", self.run_event_loop());
113
114 tasks.join_next_as_error().await.context("ntx-builder task failed")
117 }
118
119 async fn run_event_loop(mut self) -> anyhow::Result<()> {
120 let loop_db = self
123 .db
124 .pin_loop_connection()
125 .await
126 .context("failed to pin a database connection for the ntx-builder event loop")?;
127
128 loop {
130 let (block, committed_tip) = self.next_block().await?;
131 let local_tip = block.header().block_num();
132 self.apply_committed_block(&loop_db, block, committed_tip).await?;
133
134 if local_tip == committed_tip {
135 self.is_synced = true;
136 tracing::info!(block.number = %committed_tip, "ntx-builder is now in sync");
137 break;
138 }
139 }
140
141 let pending_accounts = loop_db
143 .accounts_with_pending_notes(self.config.max_note_attempts)
144 .await
145 .context("failed to load accounts with pending notes at catch-up")?;
146 tracing::info!(
147 num_accounts = pending_accounts.len(),
148 "spawning actors for accounts with carry-over pending notes",
149 );
150 for account_id in pending_accounts {
151 self.coordinator.spawn_actor(account_id);
152 }
153
154 loop {
156 let action = {
160 let block_stream = &mut self.block_stream;
161 let actor_request_rx = &mut self.actor_request_rx;
162 let coordinator = &mut self.coordinator;
163
164 tokio::select! {
165 block = block_stream.next() => SteadyStateAction::Block(Box::new(block)),
166 request = actor_request_rx.recv() => SteadyStateAction::Request(request),
167 respawn = coordinator.next() => SteadyStateAction::Respawn(respawn?),
168 }
169 };
170
171 match action {
172 SteadyStateAction::Block(block) => {
173 let (block, committed_tip) =
174 (*block).context("block stream ended")?.context("block stream failed")?;
175 let effects = self
176 .apply_committed_block_with_effects(&loop_db, block, committed_tip)
177 .await?;
178 self.coordinator.handle_committed_block(&effects);
179 },
180 SteadyStateAction::Request(request) => {
181 let Some(request) = request else {
182 anyhow::bail!("actor request channel closed unexpectedly");
183 };
184 handle_actor_request(&loop_db, request).await?;
185 },
186 SteadyStateAction::Respawn(respawn) => {
187 if let Some(account_id) = respawn {
188 tracing::info!(
189 account.id = %account_id,
190 "respawning actor that shut down with a pending notification",
191 );
192 self.coordinator.spawn_actor(account_id);
193 }
194 },
195 }
196 }
197 }
198
199 async fn next_block(&mut self) -> anyhow::Result<(SignedBlock, BlockNumber)> {
202 self.block_stream
203 .next()
204 .await
205 .context("block stream ended")?
206 .context("block stream failed")
207 }
208
209 async fn apply_committed_block(
211 &mut self,
212 loop_db: &LoopDb,
213 block: SignedBlock,
214 committed_tip: BlockNumber,
215 ) -> anyhow::Result<()> {
216 self.apply_committed_block_with_effects(loop_db, block, committed_tip)
217 .await
218 .map(drop)
219 }
220
221 #[tracing::instrument(
225 name = "ntx.builder.apply_committed_block",
226 skip(self, loop_db, block),
227 fields(block_num = %block.header().block_num(), %committed_tip),
228 )]
229 async fn apply_committed_block_with_effects(
230 &mut self,
231 loop_db: &LoopDb,
232 block: SignedBlock,
233 committed_tip: BlockNumber,
234 ) -> anyhow::Result<CommittedBlockEffects> {
235 let header = block.header().clone();
236 let block_num = header.block_num();
237
238 let effects = CommittedBlockEffects::from_signed_block(&block);
239
240 self.chain.update_chain_tip(header, self.config.max_block_count);
243 let next_mmr = self.chain.current_mmr();
244
245 loop_db
246 .apply_committed_block(effects.clone(), next_mmr)
247 .await
248 .context("failed to apply committed block to DB")?;
249
250 self.last_applied_block = block_num;
251
252 Ok(effects)
253 }
254}
255
256async fn handle_actor_request(loop_db: &LoopDb, request: ActorRequest) -> anyhow::Result<()> {
259 match request {
260 ActorRequest::NotesFailed { failed_notes, block_num, ack_tx } => {
261 loop_db
262 .notes_failed(failed_notes, block_num)
263 .await
264 .context("failed to persist note failure")?;
265 let _ = ack_tx.send(());
266 },
267 ActorRequest::CacheNoteScript { script_root, script } => {
268 loop_db
269 .insert_note_script(script_root, &script)
270 .await
271 .context("failed to cache note script")?;
272 },
273 }
274 Ok(())
275}