zebra_consensus/router.rs
1//! Top-level semantic block verification for Zebra.
2//!
3//! Verifies blocks using the [`CheckpointVerifier`] or full [`SemanticBlockVerifier`],
4//! depending on the config and block height.
5//!
6//! # Correctness
7//!
8//! Block and transaction verification requests should be wrapped in a timeout, because:
9//! - checkpoint verification waits for previous blocks, and
10//! - full block and transaction verification wait for UTXOs from previous blocks.
11//!
12//! Otherwise, verification of out-of-order and invalid blocks and transactions can hang
13//! indefinitely.
14
15use core::fmt;
16use std::{
17 future::Future,
18 pin::Pin,
19 sync::Arc,
20 task::{Context, Poll},
21};
22
23use futures::{FutureExt, TryFutureExt};
24use thiserror::Error;
25use tokio::{sync::oneshot, task::JoinHandle};
26use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
27use tracing::{instrument, Instrument, Span};
28
29use zebra_chain::{
30 block::{self, Height},
31 parameters::{checkpoint::list::CheckpointList, Network},
32};
33
34use zebra_node_services::mempool;
35use zebra_state as zs;
36
37use crate::{
38 block::{Request, SemanticBlockVerifier, VerifyBlockError},
39 checkpoint::{CheckpointVerifier, VerifyCheckpointError},
40 error::TransactionError,
41 transaction, BoxError, Config,
42};
43
44pub mod service_trait;
45
46#[cfg(test)]
47mod tests;
48
49/// The bound for the chain verifier and transaction verifier buffers.
50///
51/// We choose the verifier buffer bound based on the maximum number of
52/// concurrent verifier users, to avoid contention:
53/// - the `ChainSync` block download and verify stream
54/// - the `Inbound` block download and verify stream
55/// - the `Mempool` transaction download and verify stream
56/// - a block miner component, which we might add in future, and
57/// - 1 extra slot to avoid contention.
58///
59/// We deliberately add extra slots, because they only cost a small amount of
60/// memory, but missing slots can significantly slow down Zebra.
61const VERIFIER_BUFFER_BOUND: usize = 5;
62
63/// The block verifier router routes requests to either the checkpoint verifier or the
64/// semantic block verifier, depending on the maximum checkpoint height.
65///
66/// # Correctness
67///
68/// Block verification requests should be wrapped in a timeout, so that
69/// out-of-order and invalid requests do not hang indefinitely. See the [`router`](`crate::router`)
70/// module documentation for details.
71struct BlockVerifierRouter<S, V>
72where
73 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
74 S::Future: Send + 'static,
75 V: Service<transaction::Request, Response = transaction::Response, Error = BoxError>
76 + Send
77 + Clone
78 + 'static,
79 V::Future: Send + 'static,
80{
81 /// The checkpointing block verifier.
82 ///
83 /// Always used for blocks before `Canopy`, optionally used for the entire checkpoint list.
84 checkpoint: CheckpointVerifier<S>,
85
86 /// The highest permitted checkpoint block.
87 ///
88 /// This height must be in the `checkpoint` verifier's checkpoint list.
89 max_checkpoint_height: block::Height,
90
91 /// The full semantic block verifier, used for blocks after `max_checkpoint_height`.
92 block: SemanticBlockVerifier<S, V>,
93}
94
95/// An error while semantically verifying a block.
96//
97// One or both of these error variants are at least 140 bytes
98#[derive(Debug, Error)]
99#[allow(missing_docs)]
100pub enum RouterError {
101 /// Block could not be checkpointed
102 Checkpoint { source: Box<VerifyCheckpointError> },
103 /// Block could not be full-verified
104 Block { source: Box<VerifyBlockError> },
105}
106
107impl fmt::Display for RouterError {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 f.write_str(&match self {
110 RouterError::Checkpoint { source } => {
111 format!("block could not be checkpointed due to: {source}")
112 }
113 RouterError::Block { source } => {
114 format!("block could not be full-verified due to: {source}")
115 }
116 })
117 }
118}
119
120impl From<VerifyCheckpointError> for RouterError {
121 fn from(err: VerifyCheckpointError) -> Self {
122 RouterError::Checkpoint {
123 source: Box::new(err),
124 }
125 }
126}
127
128impl From<VerifyBlockError> for RouterError {
129 fn from(err: VerifyBlockError) -> Self {
130 RouterError::Block {
131 source: Box::new(err),
132 }
133 }
134}
135
136impl RouterError {
137 /// Returns `true` if this is definitely a duplicate request.
138 /// Some duplicate requests might not be detected, and therefore return `false`.
139 pub fn is_duplicate_request(&self) -> bool {
140 match self {
141 RouterError::Checkpoint { source, .. } => source.is_duplicate_request(),
142 RouterError::Block { source, .. } => source.is_duplicate_request(),
143 }
144 }
145
146 /// Returns a suggested misbehaviour score increment for a certain error.
147 pub fn misbehavior_score(&self) -> u32 {
148 // TODO: Adjust these values based on zcashd (#9258).
149 match self {
150 RouterError::Checkpoint { source } => source.misbehavior_score(),
151 RouterError::Block { source } => source.misbehavior_score(),
152 }
153 }
154}
155
156impl<S, V> Service<Request> for BlockVerifierRouter<S, V>
157where
158 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
159 S::Future: Send + 'static,
160 V: Service<transaction::Request, Response = transaction::Response, Error = BoxError>
161 + Send
162 + Clone
163 + 'static,
164 V::Future: Send + 'static,
165{
166 type Response = block::Hash;
167 type Error = RouterError;
168 type Future =
169 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
170
171 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
172 // CORRECTNESS
173 //
174 // The current task must be scheduled for wakeup every time we return
175 // `Poll::Pending`.
176 //
177 // If either verifier is unready, this task is scheduled for wakeup when it becomes
178 // ready.
179 //
180 // We acquire checkpoint readiness before block readiness, to avoid an unlikely
181 // hang during the checkpoint to block verifier transition. If the checkpoint and
182 // block verifiers are contending for the same buffer/batch, we want the checkpoint
183 // verifier to win, so that checkpoint verification completes, and block verification
184 // can start. (Buffers and batches have multiple slots, so this contention is unlikely.)
185 use futures::ready;
186 // The chain verifier holds one slot in each verifier, for each concurrent task.
187 // Therefore, any shared buffers or batches polled by these verifiers should double
188 // their bounds. (For example, the state service buffer.)
189 ready!(self.checkpoint.poll_ready(cx))?;
190 ready!(self.block.poll_ready(cx))?;
191 Poll::Ready(Ok(()))
192 }
193
194 fn call(&mut self, request: Request) -> Self::Future {
195 let block = request.block();
196
197 match block.coinbase_height() {
198 // There's currently no known use case for block proposals below the checkpoint height,
199 // so it's okay to immediately return an error here.
200 Some(height) if height <= self.max_checkpoint_height && request.is_proposal() => {
201 async {
202 // TODO: Add a `ValidateProposalError` enum with a `BelowCheckpoint` variant?
203 Err(VerifyBlockError::ValidateProposal(
204 "block proposals must be above checkpoint height".into(),
205 ))?
206 }
207 .boxed()
208 }
209
210 Some(height) if height <= self.max_checkpoint_height => {
211 self.checkpoint.call(block).map_err(Into::into).boxed()
212 }
213 // This also covers blocks with no height, which the block verifier
214 // will reject immediately.
215 _ => self.block.call(request).map_err(Into::into).boxed(),
216 }
217 }
218}
219
220/// Initialize block and transaction verification services.
221///
222/// Returns a block verifier, transaction verifier,
223/// a [`BackgroundTaskHandles`] with the state checkpoint verify task,
224/// and the maximum configured checkpoint verification height.
225///
226/// The consensus configuration is specified by `config`, and the Zcash network
227/// to verify blocks for is specified by `network`.
228///
229/// The block verification service asynchronously performs semantic verification
230/// checks. Blocks that pass semantic verification are submitted to the supplied
231/// `state_service` for contextual verification before being committed to the chain.
232///
233/// The transaction verification service asynchronously performs semantic verification
234/// checks. Transactions that pass semantic verification return an `Ok` result to the caller.
235///
236/// This function should only be called once for a particular state service.
237///
238/// Dropped requests are cancelled on a best-effort basis, but may continue to be processed.
239///
240/// # Correctness
241///
242/// Block and transaction verification requests should be wrapped in a timeout,
243/// so that out-of-order and invalid requests do not hang indefinitely.
244/// See the [`router`](`crate::router`) module documentation for details.
245#[instrument(skip(state_service, mempool))]
246pub async fn init<S, Mempool>(
247 config: Config,
248 network: &Network,
249 mut state_service: S,
250 mempool: oneshot::Receiver<Mempool>,
251) -> (
252 Buffer<BoxService<Request, block::Hash, RouterError>, Request>,
253 Buffer<
254 BoxService<transaction::Request, transaction::Response, TransactionError>,
255 transaction::Request,
256 >,
257 BackgroundTaskHandles,
258 Height,
259)
260where
261 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
262 S::Future: Send + 'static,
263 Mempool: Service<mempool::Request, Response = mempool::Response, Error = BoxError>
264 + Send
265 + Clone
266 + 'static,
267 Mempool::Future: Send + 'static,
268{
269 // Give other tasks priority before spawning the checkpoint task.
270 tokio::task::yield_now().await;
271
272 // Make sure the state contains the known best chain checkpoints, in a separate thread.
273
274 let checkpoint_state_service = state_service.clone();
275 let checkpoint_sync = config.checkpoint_sync;
276 let checkpoint_network = network.clone();
277
278 let state_checkpoint_verify_handle = tokio::task::spawn(
279 // TODO: move this into an async function?
280 async move {
281 tracing::info!("starting state checkpoint validation");
282
283 // # Consensus
284 //
285 // We want to verify all available checkpoints, even if the node is not configured
286 // to use them for syncing. Zebra's checkpoints are updated with every release,
287 // which makes sure they include the latest settled network upgrade.
288 //
289 // > A network upgrade is settled on a given network when there is a social
290 // > consensus that it has activated with a given activation block hash.
291 // > A full validator that potentially risks Mainnet funds or displays Mainnet
292 // > transaction information to a user MUST do so only for a block chain that
293 // > includes the activation block of the most recent settled network upgrade,
294 // > with the corresponding activation block hash. Currently, there is social
295 // > consensus that NU5 has activated on the Zcash Mainnet and Testnet with the
296 // > activation block hashes given in § 3.12 ‘Mainnet and Testnet’ on p. 20.
297 //
298 // <https://zips.z.cash/protocol/protocol.pdf#blockchain>
299 let full_checkpoints = checkpoint_network.checkpoint_list();
300 let mut already_warned = false;
301
302 for (height, checkpoint_hash) in full_checkpoints.iter() {
303 let checkpoint_state_service = checkpoint_state_service.clone();
304 let request = zebra_state::Request::BestChainBlockHash(*height);
305
306 match checkpoint_state_service.oneshot(request).await {
307 Ok(zebra_state::Response::BlockHash(Some(state_hash))) => assert_eq!(
308 *checkpoint_hash, state_hash,
309 "invalid block in state: a previous Zebra instance followed an \
310 incorrect chain. Delete and re-sync your state to use the best chain"
311 ),
312
313 Ok(zebra_state::Response::BlockHash(None)) => {
314 if checkpoint_sync {
315 tracing::info!(
316 "state is not fully synced yet, remaining checkpoints will be \
317 verified during syncing"
318 );
319 } else {
320 tracing::warn!(
321 "state is not fully synced yet, remaining checkpoints will be \
322 verified next time Zebra starts up. Zebra will be less secure \
323 until it is restarted. Use consensus.checkpoint_sync = true \
324 in zebrad.toml to make sure you are following a valid chain"
325 );
326 }
327
328 break;
329 }
330
331 Ok(response) => {
332 unreachable!("unexpected response type: {response:?} from state request")
333 }
334 Err(e) => {
335 // This error happens a lot in some tests, and it could happen to users.
336 if !already_warned {
337 tracing::warn!(
338 "unexpected error: {e:?} in state request while verifying previous \
339 state checkpoints. Is Zebra shutting down?"
340 );
341 already_warned = true;
342 }
343 }
344 }
345 }
346
347 tracing::info!("finished state checkpoint validation");
348 }
349 .instrument(Span::current()),
350 );
351
352 // transaction verification
353
354 let transaction = transaction::Verifier::new(network, state_service.clone(), mempool);
355 let transaction = Buffer::new(BoxService::new(transaction), VERIFIER_BUFFER_BOUND);
356
357 // block verification
358 let (list, max_checkpoint_height) = init_checkpoint_list(config, network);
359
360 let tip = match state_service
361 .ready()
362 .await
363 .unwrap()
364 .call(zs::Request::Tip)
365 .await
366 .unwrap()
367 {
368 zs::Response::Tip(tip) => tip,
369 _ => unreachable!("wrong response to Request::Tip"),
370 };
371 tracing::info!(
372 ?tip,
373 ?max_checkpoint_height,
374 "initializing block verifier router"
375 );
376
377 let block = SemanticBlockVerifier::new(network, state_service.clone(), transaction.clone());
378 let checkpoint = CheckpointVerifier::from_checkpoint_list(list, network, tip, state_service);
379 let router = BlockVerifierRouter {
380 checkpoint,
381 max_checkpoint_height,
382 block,
383 };
384
385 let router = Buffer::new(BoxService::new(router), VERIFIER_BUFFER_BOUND);
386
387 let task_handles = BackgroundTaskHandles {
388 state_checkpoint_verify_handle,
389 };
390
391 (router, transaction, task_handles, max_checkpoint_height)
392}
393
394/// Parses the checkpoint list for `network` and `config`.
395/// Returns the checkpoint list and maximum checkpoint height.
396pub fn init_checkpoint_list(config: Config, network: &Network) -> (Arc<CheckpointList>, Height) {
397 // TODO: Zebra parses the checkpoint list three times at startup.
398 // Instead, cache the checkpoint list for each `network`.
399 let list = network.checkpoint_list();
400
401 let max_checkpoint_height = if config.checkpoint_sync {
402 list.max_height()
403 } else {
404 list.min_height_in_range(network.mandatory_checkpoint_height()..)
405 .expect("hardcoded checkpoint list extends past canopy activation")
406 };
407
408 (list, max_checkpoint_height)
409}
410
411/// The background task handles for `zebra-consensus` verifier initialization.
412#[derive(Debug)]
413pub struct BackgroundTaskHandles {
414 /// A handle to the state checkpoint verify task.
415 /// Finishes when all the checkpoints are verified, or when the state tip is reached.
416 pub state_checkpoint_verify_handle: JoinHandle<()>,
417}
418
419/// Calls [`init`] with a closed mempool setup channel for conciseness in tests.
420///
421/// See [`init`] for more details.
422#[cfg(any(test, feature = "proptest-impl"))]
423pub async fn init_test<S>(
424 config: Config,
425 network: &Network,
426 state_service: S,
427) -> (
428 Buffer<BoxService<Request, block::Hash, RouterError>, Request>,
429 Buffer<
430 BoxService<transaction::Request, transaction::Response, TransactionError>,
431 transaction::Request,
432 >,
433 BackgroundTaskHandles,
434 Height,
435)
436where
437 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
438 S::Future: Send + 'static,
439{
440 init(
441 config.clone(),
442 network,
443 state_service.clone(),
444 oneshot::channel::<
445 Buffer<BoxService<mempool::Request, mempool::Response, BoxError>, mempool::Request>,
446 >()
447 .1,
448 )
449 .await
450}