1use std::sync::Arc;
2
3use miden_node_proto::domain::proof_request::BlockProofRequest;
4use miden_node_utils::ErrorReport;
5use miden_protocol::Word;
6use miden_protocol::account::delta::AccountUpdateDetails;
7use miden_protocol::batch::OrderedBatches;
8use miden_protocol::block::account_tree::AccountMutationSet;
9use miden_protocol::block::nullifier_tree::NullifierMutationSet;
10use miden_protocol::block::{BlockBody, BlockHeader, BlockInputs, BlockNumber, SignedBlock};
11use miden_protocol::note::{NoteDetails, Nullifier};
12use miden_protocol::transaction::OutputNote;
13use miden_protocol::utils::serde::Serializable;
14use tokio::sync::oneshot;
15use tracing::{Instrument, info, info_span, instrument};
16
17use crate::db::NoteRecord;
18use crate::errors::{ApplyBlockError, ApplyBlockWithProvingInputsError, InvalidBlockError};
19use crate::state::{BlockNotification, State};
20use crate::{COMPONENT, HistoricalError};
21
22impl State {
23 #[instrument(target = COMPONENT, skip_all, err)]
27 pub async fn apply_block_with_proving_inputs(
28 &self,
29 ordered_batches: OrderedBatches,
30 block_inputs: BlockInputs,
31 signed_block: SignedBlock,
32 ) -> Result<(), ApplyBlockWithProvingInputsError> {
33 let block_header = signed_block.header().clone();
34 let block_num = block_header.block_num();
35
36 let proving_inputs = BlockProofRequest {
37 tx_batches: ordered_batches,
38 block_header,
39 block_inputs,
40 };
41
42 self.save_proving_inputs(block_num, &proving_inputs)
43 .await
44 .map_err(ApplyBlockWithProvingInputsError::SaveProvingInputs)?;
45
46 self.apply_block(signed_block)
47 .await
48 .map_err(ApplyBlockWithProvingInputsError::ApplyBlock)
49 }
50
51 #[instrument(target = COMPONENT, skip_all, err)]
76 pub async fn apply_block(&self, signed_block: SignedBlock) -> Result<(), ApplyBlockError> {
77 let _lock = self.writer.try_lock().map_err(|_| ApplyBlockError::ConcurrentWrite)?;
78
79 let header = signed_block.header();
80 let body = signed_block.body();
81
82 let block_num = header.block_num();
83 let block_commitment = header.commitment();
84
85 self.validate_block_header(header, body).await?;
86
87 let signed_block_bytes = signed_block.to_bytes();
91 let cache_bytes = signed_block_bytes.clone();
93 let store = Arc::clone(&self.block_store);
94 let block_save_task = tokio::spawn(
95 async move { store.save_block(block_num, &signed_block_bytes).await }.in_current_span(),
96 );
97
98 let (
99 nullifier_tree_old_root,
100 nullifier_tree_update,
101 account_tree_old_root,
102 account_tree_update,
103 ) = self.compute_tree_mutations(header, body).await?;
104
105 let notes = Self::build_note_records(header, body)?;
106
107 let (allow_acquire, acquired_allowed) = oneshot::channel::<()>();
109 let (inform_acquire_done, acquire_done) = oneshot::channel::<()>();
111
112 let account_deltas =
115 Vec::from_iter(body.updated_accounts().iter().filter_map(
116 |update| match update.details() {
117 AccountUpdateDetails::Delta(delta) => Some(delta.clone()),
118 AccountUpdateDetails::Private => None,
119 },
120 ));
121
122 let db = Arc::clone(&self.db);
126 let db_update_task = tokio::spawn(
127 async move { db.apply_block(allow_acquire, acquire_done, signed_block, notes).await }
128 .in_current_span(),
129 );
130
131 acquired_allowed
133 .instrument(info_span!(target: COMPONENT, "await_db_readiness"))
134 .await
135 .map_err(ApplyBlockError::ClosedChannel)?;
136
137 block_save_task.await??;
139
140 self.with_inner_write_blocking(|inner| {
141 if inner.nullifier_tree.root() != nullifier_tree_old_root
146 || inner.account_tree.root_latest() != account_tree_old_root
147 {
148 return Err(ApplyBlockError::ConcurrentWrite);
149 }
150
151 inform_acquire_done
154 .send(())
155 .map_err(|_| ApplyBlockError::DbUpdateTaskFailed("Receiver was dropped".into()))?;
156
157 tokio::runtime::Handle::current()
161 .block_on(db_update_task)?
162 .map_err(|err| ApplyBlockError::DbUpdateTaskFailed(err.as_report()))?;
163
164 inner
166 .nullifier_tree
167 .apply_mutations(nullifier_tree_update)
168 .expect("Unreachable: old nullifier tree root must be checked before this step");
169 inner
170 .account_tree
171 .apply_mutations(account_tree_update)
172 .expect("Unreachable: old account tree root must be checked before this step");
173
174 inner.blockchain.push(block_commitment);
175
176 Ok(())
177 })?;
178
179 self.with_forest_write_blocking(|forest| {
180 forest.apply_block_updates(block_num, account_deltas)
181 })?;
182
183 self.block_cache
185 .push(block_num, BlockNotification::new(block_num, cache_bytes))
186 .expect("block cache receives sequential block numbers");
187 let _ = self.committed_tip_tx.send(block_num);
188
189 info!(%block_commitment, block_num = block_num.as_u32(), COMPONENT, "apply_block successful");
190
191 Ok(())
192 }
193
194 pub async fn save_proving_inputs(
196 &self,
197 block_num: BlockNumber,
198 proving_inputs: &BlockProofRequest,
199 ) -> std::io::Result<()> {
200 self.block_store
201 .save_proving_inputs(block_num, &proving_inputs.to_bytes())
202 .await
203 }
204
205 #[instrument(target = COMPONENT, skip_all, err)]
207 async fn validate_block_header(
208 &self,
209 header: &BlockHeader,
210 body: &BlockBody,
211 ) -> Result<(), ApplyBlockError> {
212 let tx_commitment = body.transactions().commitment();
214 if header.tx_commitment() != tx_commitment {
215 return Err(InvalidBlockError::InvalidBlockTxCommitment {
216 expected: tx_commitment,
217 actual: header.tx_commitment(),
218 }
219 .into());
220 }
221
222 let block_num = header.block_num();
223
224 let prev_block = self
226 .db
227 .select_block_header_by_block_num(None)
228 .await?
229 .ok_or(ApplyBlockError::DbBlockHeaderEmpty)?;
230 let expected_block_num = prev_block.block_num().child();
231 if block_num != expected_block_num {
232 return Err(InvalidBlockError::NewBlockInvalidBlockNum {
233 expected: expected_block_num,
234 submitted: block_num,
235 }
236 .into());
237 }
238 if header.prev_block_commitment() != prev_block.commitment() {
239 return Err(InvalidBlockError::NewBlockInvalidPrevCommitment.into());
240 }
241
242 Ok(())
243 }
244
245 #[instrument(target = COMPONENT, skip_all, err)]
247 async fn compute_tree_mutations(
248 &self,
249 header: &BlockHeader,
250 body: &BlockBody,
251 ) -> Result<(Word, NullifierMutationSet, Word, AccountMutationSet), ApplyBlockError> {
252 self.with_inner_read_blocking(|inner| {
253 let block_num = header.block_num();
254
255 let duplicate_nullifiers: Vec<_> = body
257 .created_nullifiers()
258 .iter()
259 .filter(|&nullifier| inner.nullifier_tree.get_block_num(nullifier).is_some())
260 .copied()
261 .collect();
262 if !duplicate_nullifiers.is_empty() {
263 return Err(InvalidBlockError::DuplicatedNullifiers(duplicate_nullifiers).into());
264 }
265
266 let peaks = inner.blockchain.peaks();
268 if peaks.hash_peaks() != header.chain_commitment() {
269 return Err(InvalidBlockError::NewBlockInvalidChainCommitment.into());
270 }
271
272 let nullifier_tree_update = inner
274 .nullifier_tree
275 .compute_mutations(
276 body.created_nullifiers().iter().map(|nullifier| (*nullifier, block_num)),
277 )
278 .map_err(InvalidBlockError::NewBlockNullifierAlreadySpent)?;
279
280 if nullifier_tree_update.as_mutation_set().root() != header.nullifier_root() {
281 return Err(InvalidBlockError::NewBlockInvalidNullifierRoot.into());
282 }
283
284 let account_tree_update = inner
286 .account_tree
287 .compute_mutations(
288 body.updated_accounts()
289 .iter()
290 .map(|update| (update.account_id(), update.final_state_commitment())),
291 )
292 .map_err(|e| match e {
293 HistoricalError::AccountTreeError(err) => {
294 InvalidBlockError::NewBlockDuplicateAccountIdPrefix(err)
295 },
296 HistoricalError::MerkleError(_) => {
297 panic!("Unexpected MerkleError during account tree mutation computation")
298 },
299 })?;
300
301 if account_tree_update.as_mutation_set().root() != header.account_root() {
302 return Err(InvalidBlockError::NewBlockInvalidAccountRoot.into());
303 }
304
305 Ok((
306 inner.nullifier_tree.root(),
307 nullifier_tree_update,
308 inner.account_tree.root_latest(),
309 account_tree_update,
310 ))
311 })
312 }
313
314 #[instrument(target = COMPONENT, skip_all, err)]
316 fn build_note_records(
317 header: &BlockHeader,
318 body: &BlockBody,
319 ) -> Result<Vec<(NoteRecord, Option<Nullifier>)>, ApplyBlockError> {
320 let block_num = header.block_num();
321
322 let note_tree = body.compute_block_note_tree();
323 if note_tree.root() != header.note_root() {
324 return Err(InvalidBlockError::NewBlockInvalidNoteRoot.into());
325 }
326
327 let notes = body
328 .output_notes()
329 .map(|(note_index, note)| {
330 let (details, attachments, nullifier) = match note {
331 OutputNote::Public(public) => (
332 Some(NoteDetails::from(public.as_note())),
333 public.as_note().attachments().clone(),
334 Some(public.as_note().nullifier()),
335 ),
336 OutputNote::Private(private) => (None, private.attachments().clone(), None),
337 };
338
339 let inclusion_path = note_tree.open(note_index);
340
341 let note_record = NoteRecord {
342 block_num,
343 note_index,
344 note_id: note.id().as_word(),
345 metadata: *note.metadata(),
346 details,
347 attachments,
348 inclusion_path,
349 };
350
351 Ok((note_record, nullifier))
352 })
353 .collect::<Result<Vec<_>, InvalidBlockError>>()?;
354
355 Ok(notes)
356 }
357}