Skip to main content

miden_ntx_builder/
builder.rs

1use std::pin::Pin;
2use std::sync::Arc;
3
4use anyhow::Context;
5use futures::Stream;
6use miden_node_utils::tasks::Tasks;
7use miden_protocol::block::{BlockNumber, SignedBlock};
8use tokio::net::TcpListener;
9use tokio::sync::mpsc;
10use tokio_stream::StreamExt;
11
12use crate::NtxBuilderConfig;
13use crate::actor::ActorRequest;
14use crate::chain_state::SharedChainState;
15use crate::clients::RpcError;
16use crate::committed_block::CommittedBlockEffects;
17use crate::coordinator::Coordinator;
18use crate::db::{Db, LoopDb};
19use crate::server::NtxBuilderRpcServer;
20
21/// Discriminator returned by the steady-state `select!` so the dispatch can run on a fully-owned
22/// `&mut self` instead of three concurrent borrows. The `Block` variant is boxed since a
23/// `SignedBlock` dwarfs the other two payloads.
24enum SteadyStateAction {
25    Block(Box<Option<Result<(SignedBlock, BlockNumber), RpcError>>>),
26    Request(Option<ActorRequest>),
27    Respawn(Option<miden_protocol::account::AccountId>),
28}
29
30// NETWORK TRANSACTION BUILDER
31// ================================================================================================
32
33/// Boxed, pinned stream of committed blocks paired with the node-reported committed chain tip at
34/// the time each block was emitted.
35///
36/// Boxing gives the stream a `'static` lifetime by ensuring it owns all its data, avoiding the
37/// complex lifetime annotations otherwise required to store `impl Stream`.
38pub(crate) type BlockStream =
39    Pin<Box<dyn Stream<Item = Result<(SignedBlock, BlockNumber), RpcError>> + Send>>;
40
41/// Network transaction builder component.
42///
43/// Runs in three phases:
44/// 1. **Catch-up**: drain the committed-block subscription, applying each block to the local DB
45///    and in-memory chain, until the local tip matches the node-reported `committed_chain_tip`
46///    (signaled by `is_synced` flipping to `true`). No actors run.
47/// 2. **Boundary**: query the DB for accounts with carry-over pending notes (e.g. from a previous
48///    process) and spawn an actor for each.
49/// 3. **Steady-state**: on every subsequent committed block, apply the effects, advance the chain,
50///    and have the coordinator spawn-if-missing for newly-targeted accounts then wake every active
51///    actor. Concurrently drain actor requests (`NotesFailed`, `CacheNoteScript`) so the actors'
52///    DB writes happen serialized through the builder.
53pub struct NetworkTransactionBuilder {
54    /// Configuration for the builder.
55    config: NtxBuilderConfig,
56    /// Database for persistent state.
57    db: Db,
58    /// Stream of committed blocks from the node RPC service.
59    block_stream: BlockStream,
60    /// Highest block number applied to the DB so far.
61    last_applied_block: BlockNumber,
62    /// In-memory partial chain shared with every spawned actor through the coordinator.
63    chain: Arc<SharedChainState>,
64    /// Lifecycle owner for `AccountActor` instances.
65    coordinator: Coordinator,
66    /// Channel receiving DB-side requests (note-failed bookkeeping, script-cache persistence) from
67    /// spawned actors. Drained in the steady-state loop so writes happen through the builder.
68    actor_request_rx: mpsc::Receiver<ActorRequest>,
69    /// `false` until the first applied block whose `committed_chain_tip` matches the just-applied
70    /// block number. Stays `true` afterwards.
71    is_synced: bool,
72}
73
74impl NetworkTransactionBuilder {
75    pub(crate) fn new(
76        config: NtxBuilderConfig,
77        db: Db,
78        block_stream: BlockStream,
79        last_applied_block: BlockNumber,
80        chain: Arc<SharedChainState>,
81        coordinator: Coordinator,
82        actor_request_rx: mpsc::Receiver<ActorRequest>,
83    ) -> Self {
84        Self {
85            config,
86            db,
87            block_stream,
88            last_applied_block,
89            chain,
90            coordinator,
91            actor_request_rx,
92            is_synced: false,
93        }
94    }
95
96    /// Returns `true` once the builder has caught up to the node's committed chain tip at least
97    /// once. Stays `true` for the lifetime of the process.
98    pub fn is_synced(&self) -> bool {
99        self.is_synced
100    }
101
102    /// Runs the network transaction builder event loop until a fatal error occurs.
103    pub async fn run(self, listener: TcpListener) -> anyhow::Result<()> {
104        let mut tasks = Tasks::new();
105
106        // Start the gRPC server.
107        let server = NtxBuilderRpcServer::new(self.db.clone(), self.config.max_note_attempts);
108        tasks.spawn("grpc-server", async move {
109            server.serve(listener).await.context("ntx-builder gRPC server failed")
110        });
111
112        tasks.spawn("event-loop", self.run_event_loop());
113
114        // Wait for either the event loop or the gRPC server to complete. Any completion is treated
115        // as fatal.
116        tasks.join_next_as_error().await.context("ntx-builder task failed")
117    }
118
119    async fn run_event_loop(mut self) -> anyhow::Result<()> {
120        // Pin a dedicated connection for the loop's DB writes so block application is never starved
121        // by the account actors competing for the shared pool.
122        let loop_db = self
123            .db
124            .pin_loop_connection()
125            .await
126            .context("failed to pin a database connection for the ntx-builder event loop")?;
127
128        // Phase 1: catch-up.
129        loop {
130            let (block, committed_tip) = self.next_block().await?;
131            let local_tip = block.header().block_num();
132            self.apply_committed_block(&loop_db, block, committed_tip).await?;
133
134            if local_tip == committed_tip {
135                self.is_synced = true;
136                tracing::info!(block.number = %committed_tip, "ntx-builder is now in sync");
137                break;
138            }
139        }
140
141        // Phase 2: spawn an actor for every account with carry-over pending notes.
142        let pending_accounts = loop_db
143            .accounts_with_pending_notes(self.config.max_note_attempts)
144            .await
145            .context("failed to load accounts with pending notes at catch-up")?;
146        tracing::info!(
147            num_accounts = pending_accounts.len(),
148            "spawning actors for accounts with carry-over pending notes",
149        );
150        for account_id in pending_accounts {
151            self.coordinator.spawn_actor(account_id);
152        }
153
154        // Phase 3: drive actors per committed block, plus serialize their DB writes.
155        loop {
156            // Split `&mut self` into disjoint borrows so each `select!` arm holds only the one
157            // field it polls. The action is materialised and self is released before the body
158            // dispatches the work via the regular `&mut self` methods.
159            let action = {
160                let block_stream = &mut self.block_stream;
161                let actor_request_rx = &mut self.actor_request_rx;
162                let coordinator = &mut self.coordinator;
163
164                tokio::select! {
165                    block = block_stream.next() => SteadyStateAction::Block(Box::new(block)),
166                    request = actor_request_rx.recv() => SteadyStateAction::Request(request),
167                    respawn = coordinator.next() => SteadyStateAction::Respawn(respawn?),
168                }
169            };
170
171            match action {
172                SteadyStateAction::Block(block) => {
173                    let (block, committed_tip) =
174                        (*block).context("block stream ended")?.context("block stream failed")?;
175                    let effects = self
176                        .apply_committed_block_with_effects(&loop_db, block, committed_tip)
177                        .await?;
178                    self.coordinator.handle_committed_block(&effects);
179                },
180                SteadyStateAction::Request(request) => {
181                    let Some(request) = request else {
182                        anyhow::bail!("actor request channel closed unexpectedly");
183                    };
184                    handle_actor_request(&loop_db, request).await?;
185                },
186                SteadyStateAction::Respawn(respawn) => {
187                    if let Some(account_id) = respawn {
188                        tracing::info!(
189                            account.id = %account_id,
190                            "respawning actor that shut down with a pending notification",
191                        );
192                        self.coordinator.spawn_actor(account_id);
193                    }
194                },
195            }
196        }
197    }
198
199    /// Pulls the next `(block, committed_tip)` pair from the subscription, surfacing both the
200    /// "stream ended" and per-item RPC errors as `anyhow::Error`.
201    async fn next_block(&mut self) -> anyhow::Result<(SignedBlock, BlockNumber)> {
202        self.block_stream
203            .next()
204            .await
205            .context("block stream ended")?
206            .context("block stream failed")
207    }
208
209    /// Applies a committed block without surfacing the computed effects.
210    async fn apply_committed_block(
211        &mut self,
212        loop_db: &LoopDb,
213        block: SignedBlock,
214        committed_tip: BlockNumber,
215    ) -> anyhow::Result<()> {
216        self.apply_committed_block_with_effects(loop_db, block, committed_tip)
217            .await
218            .map(drop)
219    }
220
221    /// Applies a committed block and returns the computed `CommittedBlockEffects` so the
222    /// steady-state loop can hand them to the coordinator without re-deriving from the signed
223    /// block.
224    #[tracing::instrument(
225        name = "ntx.builder.apply_committed_block",
226        skip(self, loop_db, block),
227        fields(block_num = %block.header().block_num(), %committed_tip),
228    )]
229    async fn apply_committed_block_with_effects(
230        &mut self,
231        loop_db: &LoopDb,
232        block: SignedBlock,
233        committed_tip: BlockNumber,
234    ) -> anyhow::Result<CommittedBlockEffects> {
235        let header = block.header().clone();
236        let block_num = header.block_num();
237
238        let effects = CommittedBlockEffects::from_signed_block(&block);
239
240        // Advance the in-memory chain (adds the previous tip header as an MMR leaf and prunes older
241        // tracked headers) before snapshotting the MMR for persistence.
242        self.chain.update_chain_tip(header, self.config.max_block_count);
243        let next_mmr = self.chain.current_mmr();
244
245        loop_db
246            .apply_committed_block(effects.clone(), next_mmr)
247            .await
248            .context("failed to apply committed block to DB")?;
249
250        self.last_applied_block = block_num;
251
252        Ok(effects)
253    }
254}
255
256/// Handles a single actor request then acknowledges the actor. Runs on the pinned loop connection
257/// so the actors' shared pool cannot starve these writes.
258async fn handle_actor_request(loop_db: &LoopDb, request: ActorRequest) -> anyhow::Result<()> {
259    match request {
260        ActorRequest::NotesFailed { failed_notes, block_num, ack_tx } => {
261            loop_db
262                .notes_failed(failed_notes, block_num)
263                .await
264                .context("failed to persist note failure")?;
265            let _ = ack_tx.send(());
266        },
267        ActorRequest::CacheNoteScript { script_root, script } => {
268            loop_db
269                .insert_note_script(script_root, &script)
270                .await
271                .context("failed to cache note script")?;
272        },
273    }
274    Ok(())
275}