Skip to main content

miden_node_store/state/
apply_block.rs

1use std::sync::Arc;
2
3use miden_node_proto::domain::proof_request::BlockProofRequest;
4use miden_node_utils::ErrorReport;
5use miden_protocol::Word;
6use miden_protocol::account::delta::AccountUpdateDetails;
7use miden_protocol::batch::OrderedBatches;
8use miden_protocol::block::account_tree::AccountMutationSet;
9use miden_protocol::block::nullifier_tree::NullifierMutationSet;
10use miden_protocol::block::{BlockBody, BlockHeader, BlockInputs, BlockNumber, SignedBlock};
11use miden_protocol::note::{NoteDetails, Nullifier};
12use miden_protocol::transaction::OutputNote;
13use miden_protocol::utils::serde::Serializable;
14use tokio::sync::oneshot;
15use tracing::{Instrument, info, info_span, instrument};
16
17use crate::db::NoteRecord;
18use crate::errors::{ApplyBlockError, ApplyBlockWithProvingInputsError, InvalidBlockError};
19use crate::state::{BlockNotification, State};
20use crate::{COMPONENT, HistoricalError};
21
22impl State {
23    /// Saves proving inputs for a signed block and applies it to the state.
24    ///
25    /// Used by the in-process block producer after it has built and signed a block.
26    #[instrument(target = COMPONENT, skip_all, err)]
27    pub async fn apply_block_with_proving_inputs(
28        &self,
29        ordered_batches: OrderedBatches,
30        block_inputs: BlockInputs,
31        signed_block: SignedBlock,
32    ) -> Result<(), ApplyBlockWithProvingInputsError> {
33        let block_header = signed_block.header().clone();
34        let block_num = block_header.block_num();
35
36        let proving_inputs = BlockProofRequest {
37            tx_batches: ordered_batches,
38            block_header,
39            block_inputs,
40        };
41
42        self.save_proving_inputs(block_num, &proving_inputs)
43            .await
44            .map_err(ApplyBlockWithProvingInputsError::SaveProvingInputs)?;
45
46        self.apply_block(signed_block)
47            .await
48            .map_err(ApplyBlockWithProvingInputsError::ApplyBlock)
49    }
50
51    /// Apply changes of a new block to the DB and in-memory data structures.
52    ///
53    /// ## Note on state consistency
54    ///
55    /// The server contains in-memory representations of the existing trees, the in-memory
56    /// representation must be kept consistent with the committed data, this is necessary so to
57    /// provide consistent results for all endpoints. In order to achieve consistency, the
58    /// following steps are used:
59    ///
60    /// - the request data is validated, prior to starting any modifications.
61    /// - block is being saved into the store in parallel with updating the DB, but before
62    ///   committing. This block is considered as candidate and not yet available for reading
63    ///   because the latest block pointer is not updated yet.
64    /// - a transaction is open in the DB and the writes are started.
65    /// - while the transaction is not committed, concurrent reads are allowed, both the DB and the
66    ///   in-memory representations, which are consistent at this stage.
67    /// - prior to committing the changes to the DB, an exclusive lock to the in-memory data is
68    ///   acquired, preventing concurrent reads to the in-memory data, since that will be
69    ///   out-of-sync w.r.t. the DB.
70    /// - the DB transaction is committed, and requests that read only from the DB can proceed to
71    ///   use the fresh data.
72    /// - the in-memory structures are updated, including the latest block pointer and the lock is
73    ///   released.
74    // TODO: This span is logged in a root span, we should connect it to the parent span.
75    #[instrument(target = COMPONENT, skip_all, err)]
76    pub async fn apply_block(&self, signed_block: SignedBlock) -> Result<(), ApplyBlockError> {
77        let _lock = self.writer.try_lock().map_err(|_| ApplyBlockError::ConcurrentWrite)?;
78
79        let header = signed_block.header();
80        let body = signed_block.body();
81
82        let block_num = header.block_num();
83        let block_commitment = header.commitment();
84
85        self.validate_block_header(header, body).await?;
86
87        // Save the block to the block store. In a case of a rolled-back DB transaction, the
88        // in-memory state will be unchanged, but the file might still be written. Such blocks
89        // should be considered candidates, not finalized blocks.
90        let signed_block_bytes = signed_block.to_bytes();
91        // Clone before moving into the block-save task so we can cache for replicas at commit.
92        let cache_bytes = signed_block_bytes.clone();
93        let store = Arc::clone(&self.block_store);
94        let block_save_task = tokio::spawn(
95            async move { store.save_block(block_num, &signed_block_bytes).await }.in_current_span(),
96        );
97
98        let (
99            nullifier_tree_old_root,
100            nullifier_tree_update,
101            account_tree_old_root,
102            account_tree_update,
103        ) = self.compute_tree_mutations(header, body).await?;
104
105        let notes = Self::build_note_records(header, body)?;
106
107        // Signals the transaction is ready to be committed, and the write lock can be acquired.
108        let (allow_acquire, acquired_allowed) = oneshot::channel::<()>();
109        // Signals the write lock has been acquired, and the transaction can be committed.
110        let (inform_acquire_done, acquire_done) = oneshot::channel::<()>();
111
112        // Extract public account updates with deltas before block is moved into async task. Private
113        // accounts are filtered out since they don't expose their state changes.
114        let account_deltas =
115            Vec::from_iter(body.updated_accounts().iter().filter_map(
116                |update| match update.details() {
117                    AccountUpdateDetails::Delta(delta) => Some(delta.clone()),
118                    AccountUpdateDetails::Private => None,
119                },
120            ));
121
122        // The DB and in-memory state updates need to be synchronized and are partially overlapping.
123        // Namely, the DB transaction only proceeds after this task acquires the in-memory write
124        // lock. This requires the DB update to run concurrently, so a new task is spawned.
125        let db = Arc::clone(&self.db);
126        let db_update_task = tokio::spawn(
127            async move { db.apply_block(allow_acquire, acquire_done, signed_block, notes).await }
128                .in_current_span(),
129        );
130
131        // Wait for the message from the DB update task, that we ready to commit the DB transaction.
132        acquired_allowed
133            .instrument(info_span!(target: COMPONENT, "await_db_readiness"))
134            .await
135            .map_err(ApplyBlockError::ClosedChannel)?;
136
137        // Awaiting the block saving task to complete without errors.
138        block_save_task.await??;
139
140        self.with_inner_write_blocking(|inner| {
141            // We need to check that neither the nullifier tree nor the account tree have changed
142            // while we were waiting for the DB preparation task to complete. If either of them did
143            // change, we do not proceed with in-memory and database updates, since it may lead to
144            // an inconsistent state.
145            if inner.nullifier_tree.root() != nullifier_tree_old_root
146                || inner.account_tree.root_latest() != account_tree_old_root
147            {
148                return Err(ApplyBlockError::ConcurrentWrite);
149            }
150
151            // Notify the DB update task that the write lock has been acquired, so it can commit the
152            // DB transaction.
153            inform_acquire_done
154                .send(())
155                .map_err(|_| ApplyBlockError::DbUpdateTaskFailed("Receiver was dropped".into()))?;
156
157            // TODO: shutdown #91 Await for successful commit of the DB transaction. If the commit
158            // fails, we mustn't change in-memory state, so we return a block applying error and
159            // don't proceed with in-memory updates.
160            tokio::runtime::Handle::current()
161                .block_on(db_update_task)?
162                .map_err(|err| ApplyBlockError::DbUpdateTaskFailed(err.as_report()))?;
163
164            // Update the in-memory data structures after successful commit of the DB transaction
165            inner
166                .nullifier_tree
167                .apply_mutations(nullifier_tree_update)
168                .expect("Unreachable: old nullifier tree root must be checked before this step");
169            inner
170                .account_tree
171                .apply_mutations(account_tree_update)
172                .expect("Unreachable: old account tree root must be checked before this step");
173
174            inner.blockchain.push(block_commitment);
175
176            Ok(())
177        })?;
178
179        self.with_forest_write_blocking(|forest| {
180            forest.apply_block_updates(block_num, account_deltas)
181        })?;
182
183        // Push to cache and notify replica subscribers.
184        self.block_cache
185            .push(block_num, BlockNotification::new(block_num, cache_bytes))
186            .expect("block cache receives sequential block numbers");
187        let _ = self.committed_tip_tx.send(block_num);
188
189        info!(%block_commitment, block_num = block_num.as_u32(), COMPONENT, "apply_block successful");
190
191        Ok(())
192    }
193
194    /// Saves the proving inputs for the given block to the block store.
195    pub async fn save_proving_inputs(
196        &self,
197        block_num: BlockNumber,
198        proving_inputs: &BlockProofRequest,
199    ) -> std::io::Result<()> {
200        self.block_store
201            .save_proving_inputs(block_num, &proving_inputs.to_bytes())
202            .await
203    }
204
205    /// Validates that the block header is consistent with the block body and the current state.
206    #[instrument(target = COMPONENT, skip_all, err)]
207    async fn validate_block_header(
208        &self,
209        header: &BlockHeader,
210        body: &BlockBody,
211    ) -> Result<(), ApplyBlockError> {
212        // Validate that header and body match.
213        let tx_commitment = body.transactions().commitment();
214        if header.tx_commitment() != tx_commitment {
215            return Err(InvalidBlockError::InvalidBlockTxCommitment {
216                expected: tx_commitment,
217                actual: header.tx_commitment(),
218            }
219            .into());
220        }
221
222        let block_num = header.block_num();
223
224        // Validate that the applied block is the next block in sequence.
225        let prev_block = self
226            .db
227            .select_block_header_by_block_num(None)
228            .await?
229            .ok_or(ApplyBlockError::DbBlockHeaderEmpty)?;
230        let expected_block_num = prev_block.block_num().child();
231        if block_num != expected_block_num {
232            return Err(InvalidBlockError::NewBlockInvalidBlockNum {
233                expected: expected_block_num,
234                submitted: block_num,
235            }
236            .into());
237        }
238        if header.prev_block_commitment() != prev_block.commitment() {
239            return Err(InvalidBlockError::NewBlockInvalidPrevCommitment.into());
240        }
241
242        Ok(())
243    }
244
245    /// Computes nullifier and account tree mutations, validating roots against the block header.
246    #[instrument(target = COMPONENT, skip_all, err)]
247    async fn compute_tree_mutations(
248        &self,
249        header: &BlockHeader,
250        body: &BlockBody,
251    ) -> Result<(Word, NullifierMutationSet, Word, AccountMutationSet), ApplyBlockError> {
252        self.with_inner_read_blocking(|inner| {
253            let block_num = header.block_num();
254
255            // nullifiers can be produced only once
256            let duplicate_nullifiers: Vec<_> = body
257                .created_nullifiers()
258                .iter()
259                .filter(|&nullifier| inner.nullifier_tree.get_block_num(nullifier).is_some())
260                .copied()
261                .collect();
262            if !duplicate_nullifiers.is_empty() {
263                return Err(InvalidBlockError::DuplicatedNullifiers(duplicate_nullifiers).into());
264            }
265
266            // new_block.chain_root must be equal to the chain MMR root prior to the update
267            let peaks = inner.blockchain.peaks();
268            if peaks.hash_peaks() != header.chain_commitment() {
269                return Err(InvalidBlockError::NewBlockInvalidChainCommitment.into());
270            }
271
272            // compute update for nullifier tree
273            let nullifier_tree_update = inner
274                .nullifier_tree
275                .compute_mutations(
276                    body.created_nullifiers().iter().map(|nullifier| (*nullifier, block_num)),
277                )
278                .map_err(InvalidBlockError::NewBlockNullifierAlreadySpent)?;
279
280            if nullifier_tree_update.as_mutation_set().root() != header.nullifier_root() {
281                return Err(InvalidBlockError::NewBlockInvalidNullifierRoot.into());
282            }
283
284            // compute update for account tree
285            let account_tree_update = inner
286                .account_tree
287                .compute_mutations(
288                    body.updated_accounts()
289                        .iter()
290                        .map(|update| (update.account_id(), update.final_state_commitment())),
291                )
292                .map_err(|e| match e {
293                    HistoricalError::AccountTreeError(err) => {
294                        InvalidBlockError::NewBlockDuplicateAccountIdPrefix(err)
295                    },
296                    HistoricalError::MerkleError(_) => {
297                        panic!("Unexpected MerkleError during account tree mutation computation")
298                    },
299                })?;
300
301            if account_tree_update.as_mutation_set().root() != header.account_root() {
302                return Err(InvalidBlockError::NewBlockInvalidAccountRoot.into());
303            }
304
305            Ok((
306                inner.nullifier_tree.root(),
307                nullifier_tree_update,
308                inner.account_tree.root_latest(),
309                account_tree_update,
310            ))
311        })
312    }
313
314    /// Builds note records with inclusion proofs from the block body.
315    #[instrument(target = COMPONENT, skip_all, err)]
316    fn build_note_records(
317        header: &BlockHeader,
318        body: &BlockBody,
319    ) -> Result<Vec<(NoteRecord, Option<Nullifier>)>, ApplyBlockError> {
320        let block_num = header.block_num();
321
322        let note_tree = body.compute_block_note_tree();
323        if note_tree.root() != header.note_root() {
324            return Err(InvalidBlockError::NewBlockInvalidNoteRoot.into());
325        }
326
327        let notes = body
328            .output_notes()
329            .map(|(note_index, note)| {
330                let (details, attachments, nullifier) = match note {
331                    OutputNote::Public(public) => (
332                        Some(NoteDetails::from(public.as_note())),
333                        public.as_note().attachments().clone(),
334                        Some(public.as_note().nullifier()),
335                    ),
336                    OutputNote::Private(private) => (None, private.attachments().clone(), None),
337                };
338
339                let inclusion_path = note_tree.open(note_index);
340
341                let note_record = NoteRecord {
342                    block_num,
343                    note_index,
344                    note_id: note.id().as_word(),
345                    metadata: *note.metadata(),
346                    details,
347                    attachments,
348                    inclusion_path,
349                };
350
351                Ok((note_record, nullifier))
352            })
353            .collect::<Result<Vec<_>, InvalidBlockError>>()?;
354
355        Ok(notes)
356    }
357}