zebra_consensus/
router.rs

1//! Top-level semantic block verification for Zebra.
2//!
3//! Verifies blocks using the [`CheckpointVerifier`] or full [`SemanticBlockVerifier`],
4//! depending on the config and block height.
5//!
6//! # Correctness
7//!
8//! Block and transaction verification requests should be wrapped in a timeout, because:
9//! - checkpoint verification waits for previous blocks, and
10//! - full block and transaction verification wait for UTXOs from previous blocks.
11//!
12//! Otherwise, verification of out-of-order and invalid blocks and transactions can hang
13//! indefinitely.
14
15use core::fmt;
16use std::{
17    future::Future,
18    pin::Pin,
19    sync::Arc,
20    task::{Context, Poll},
21};
22
23use futures::{FutureExt, TryFutureExt};
24use thiserror::Error;
25use tokio::{sync::oneshot, task::JoinHandle};
26use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
27use tracing::{instrument, Instrument, Span};
28
29use zebra_chain::{
30    block::{self, Height},
31    parameters::{checkpoint::list::CheckpointList, Network},
32};
33
34use zebra_node_services::mempool;
35use zebra_state as zs;
36
37use crate::{
38    block::{Request, SemanticBlockVerifier, VerifyBlockError},
39    checkpoint::{CheckpointVerifier, VerifyCheckpointError},
40    error::TransactionError,
41    transaction, BoxError, Config,
42};
43
44pub mod service_trait;
45
46#[cfg(test)]
47mod tests;
48
49/// The bound for the chain verifier and transaction verifier buffers.
50///
51/// We choose the verifier buffer bound based on the maximum number of
52/// concurrent verifier users, to avoid contention:
53///   - the `ChainSync` block download and verify stream
54///   - the `Inbound` block download and verify stream
55///   - the `Mempool` transaction download and verify stream
56///   - a block miner component, which we might add in future, and
57///   - 1 extra slot to avoid contention.
58///
59/// We deliberately add extra slots, because they only cost a small amount of
60/// memory, but missing slots can significantly slow down Zebra.
61const VERIFIER_BUFFER_BOUND: usize = 5;
62
63/// The block verifier router routes requests to either the checkpoint verifier or the
64/// semantic block verifier, depending on the maximum checkpoint height.
65///
66/// # Correctness
67///
68/// Block verification requests should be wrapped in a timeout, so that
69/// out-of-order and invalid requests do not hang indefinitely. See the [`router`](`crate::router`)
70/// module documentation for details.
71struct BlockVerifierRouter<S, V>
72where
73    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
74    S::Future: Send + 'static,
75    V: Service<transaction::Request, Response = transaction::Response, Error = BoxError>
76        + Send
77        + Clone
78        + 'static,
79    V::Future: Send + 'static,
80{
81    /// The checkpointing block verifier.
82    ///
83    /// Always used for blocks before `Canopy`, optionally used for the entire checkpoint list.
84    checkpoint: CheckpointVerifier<S>,
85
86    /// The highest permitted checkpoint block.
87    ///
88    /// This height must be in the `checkpoint` verifier's checkpoint list.
89    max_checkpoint_height: block::Height,
90
91    /// The full semantic block verifier, used for blocks after `max_checkpoint_height`.
92    block: SemanticBlockVerifier<S, V>,
93}
94
95/// An error while semantically verifying a block.
96//
97// One or both of these error variants are at least 140 bytes
98#[derive(Debug, Error)]
99#[allow(missing_docs)]
100pub enum RouterError {
101    /// Block could not be checkpointed
102    Checkpoint { source: Box<VerifyCheckpointError> },
103    /// Block could not be full-verified
104    Block { source: Box<VerifyBlockError> },
105}
106
107impl fmt::Display for RouterError {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        f.write_str(&match self {
110            RouterError::Checkpoint { source } => {
111                format!("block could not be checkpointed due to: {source}")
112            }
113            RouterError::Block { source } => {
114                format!("block could not be full-verified due to: {source}")
115            }
116        })
117    }
118}
119
120impl From<VerifyCheckpointError> for RouterError {
121    fn from(err: VerifyCheckpointError) -> Self {
122        RouterError::Checkpoint {
123            source: Box::new(err),
124        }
125    }
126}
127
128impl From<VerifyBlockError> for RouterError {
129    fn from(err: VerifyBlockError) -> Self {
130        RouterError::Block {
131            source: Box::new(err),
132        }
133    }
134}
135
136impl RouterError {
137    /// Returns `true` if this is definitely a duplicate request.
138    /// Some duplicate requests might not be detected, and therefore return `false`.
139    pub fn is_duplicate_request(&self) -> bool {
140        match self {
141            RouterError::Checkpoint { source, .. } => source.is_duplicate_request(),
142            RouterError::Block { source, .. } => source.is_duplicate_request(),
143        }
144    }
145
146    /// Returns a suggested misbehaviour score increment for a certain error.
147    pub fn misbehavior_score(&self) -> u32 {
148        // TODO: Adjust these values based on zcashd (#9258).
149        match self {
150            RouterError::Checkpoint { source } => source.misbehavior_score(),
151            RouterError::Block { source } => source.misbehavior_score(),
152        }
153    }
154}
155
156impl<S, V> Service<Request> for BlockVerifierRouter<S, V>
157where
158    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
159    S::Future: Send + 'static,
160    V: Service<transaction::Request, Response = transaction::Response, Error = BoxError>
161        + Send
162        + Clone
163        + 'static,
164    V::Future: Send + 'static,
165{
166    type Response = block::Hash;
167    type Error = RouterError;
168    type Future =
169        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
170
171    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
172        // CORRECTNESS
173        //
174        // The current task must be scheduled for wakeup every time we return
175        // `Poll::Pending`.
176        //
177        // If either verifier is unready, this task is scheduled for wakeup when it becomes
178        // ready.
179        //
180        // We acquire checkpoint readiness before block readiness, to avoid an unlikely
181        // hang during the checkpoint to block verifier transition. If the checkpoint and
182        // block verifiers are contending for the same buffer/batch, we want the checkpoint
183        // verifier to win, so that checkpoint verification completes, and block verification
184        // can start. (Buffers and batches have multiple slots, so this contention is unlikely.)
185        use futures::ready;
186        // The chain verifier holds one slot in each verifier, for each concurrent task.
187        // Therefore, any shared buffers or batches polled by these verifiers should double
188        // their bounds. (For example, the state service buffer.)
189        ready!(self.checkpoint.poll_ready(cx))?;
190        ready!(self.block.poll_ready(cx))?;
191        Poll::Ready(Ok(()))
192    }
193
194    fn call(&mut self, request: Request) -> Self::Future {
195        let block = request.block();
196
197        match block.coinbase_height() {
198            // There's currently no known use case for block proposals below the checkpoint height,
199            // so it's okay to immediately return an error here.
200            Some(height) if height <= self.max_checkpoint_height && request.is_proposal() => {
201                async {
202                    // TODO: Add a `ValidateProposalError` enum with a `BelowCheckpoint` variant?
203                    Err(VerifyBlockError::ValidateProposal(
204                        "block proposals must be above checkpoint height".into(),
205                    ))?
206                }
207                .boxed()
208            }
209
210            Some(height) if height <= self.max_checkpoint_height => {
211                self.checkpoint.call(block).map_err(Into::into).boxed()
212            }
213            // This also covers blocks with no height, which the block verifier
214            // will reject immediately.
215            _ => self.block.call(request).map_err(Into::into).boxed(),
216        }
217    }
218}
219
220/// Initialize block and transaction verification services.
221///
222/// Returns a block verifier, transaction verifier,
223/// a [`BackgroundTaskHandles`] with the state checkpoint verify task,
224/// and the maximum configured checkpoint verification height.
225///
226/// The consensus configuration is specified by `config`, and the Zcash network
227/// to verify blocks for is specified by `network`.
228///
229/// The block verification service asynchronously performs semantic verification
230/// checks. Blocks that pass semantic verification are submitted to the supplied
231/// `state_service` for contextual verification before being committed to the chain.
232///
233/// The transaction verification service asynchronously performs semantic verification
234/// checks. Transactions that pass semantic verification return an `Ok` result to the caller.
235///
236/// This function should only be called once for a particular state service.
237///
238/// Dropped requests are cancelled on a best-effort basis, but may continue to be processed.
239///
240/// # Correctness
241///
242/// Block and transaction verification requests should be wrapped in a timeout,
243/// so that out-of-order and invalid requests do not hang indefinitely.
244/// See the [`router`](`crate::router`) module documentation for details.
245#[instrument(skip(state_service, mempool))]
246pub async fn init<S, Mempool>(
247    config: Config,
248    network: &Network,
249    mut state_service: S,
250    mempool: oneshot::Receiver<Mempool>,
251) -> (
252    Buffer<BoxService<Request, block::Hash, RouterError>, Request>,
253    Buffer<
254        BoxService<transaction::Request, transaction::Response, TransactionError>,
255        transaction::Request,
256    >,
257    BackgroundTaskHandles,
258    Height,
259)
260where
261    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
262    S::Future: Send + 'static,
263    Mempool: Service<mempool::Request, Response = mempool::Response, Error = BoxError>
264        + Send
265        + Clone
266        + 'static,
267    Mempool::Future: Send + 'static,
268{
269    // Give other tasks priority before spawning the checkpoint task.
270    tokio::task::yield_now().await;
271
272    // Make sure the state contains the known best chain checkpoints, in a separate thread.
273
274    let checkpoint_state_service = state_service.clone();
275    let checkpoint_sync = config.checkpoint_sync;
276    let checkpoint_network = network.clone();
277
278    let state_checkpoint_verify_handle = tokio::task::spawn(
279        // TODO: move this into an async function?
280        async move {
281            tracing::info!("starting state checkpoint validation");
282
283            // # Consensus
284            //
285            // We want to verify all available checkpoints, even if the node is not configured
286            // to use them for syncing. Zebra's checkpoints are updated with every release,
287            // which makes sure they include the latest settled network upgrade.
288            //
289            // > A network upgrade is settled on a given network when there is a social
290            // > consensus that it has activated with a given activation block hash.
291            // > A full validator that potentially risks Mainnet funds or displays Mainnet
292            // > transaction information to a user MUST do so only for a block chain that
293            // > includes the activation block of the most recent settled network upgrade,
294            // > with the corresponding activation block hash. Currently, there is social
295            // > consensus that NU5 has activated on the Zcash Mainnet and Testnet with the
296            // > activation block hashes given in § 3.12 ‘Mainnet and Testnet’ on p. 20.
297            //
298            // <https://zips.z.cash/protocol/protocol.pdf#blockchain>
299            let full_checkpoints = checkpoint_network.checkpoint_list();
300            let mut already_warned = false;
301
302            for (height, checkpoint_hash) in full_checkpoints.iter() {
303                let checkpoint_state_service = checkpoint_state_service.clone();
304                let request = zebra_state::Request::BestChainBlockHash(*height);
305
306                match checkpoint_state_service.oneshot(request).await {
307                    Ok(zebra_state::Response::BlockHash(Some(state_hash))) => assert_eq!(
308                        *checkpoint_hash, state_hash,
309                        "invalid block in state: a previous Zebra instance followed an \
310                         incorrect chain. Delete and re-sync your state to use the best chain"
311                    ),
312
313                    Ok(zebra_state::Response::BlockHash(None)) => {
314                        if checkpoint_sync {
315                            tracing::info!(
316                                "state is not fully synced yet, remaining checkpoints will be \
317                                 verified during syncing"
318                            );
319                        } else {
320                            tracing::warn!(
321                                "state is not fully synced yet, remaining checkpoints will be \
322                                 verified next time Zebra starts up. Zebra will be less secure \
323                                 until it is restarted. Use consensus.checkpoint_sync = true \
324                                 in zebrad.toml to make sure you are following a valid chain"
325                            );
326                        }
327
328                        break;
329                    }
330
331                    Ok(response) => {
332                        unreachable!("unexpected response type: {response:?} from state request")
333                    }
334                    Err(e) => {
335                        // This error happens a lot in some tests, and it could happen to users.
336                        if !already_warned {
337                            tracing::warn!(
338                                "unexpected error: {e:?} in state request while verifying previous \
339                                 state checkpoints. Is Zebra shutting down?"
340                            );
341                            already_warned = true;
342                        }
343                    }
344                }
345            }
346
347            tracing::info!("finished state checkpoint validation");
348        }
349        .instrument(Span::current()),
350    );
351
352    // transaction verification
353
354    let transaction = transaction::Verifier::new(network, state_service.clone(), mempool);
355    let transaction = Buffer::new(BoxService::new(transaction), VERIFIER_BUFFER_BOUND);
356
357    // block verification
358    let (list, max_checkpoint_height) = init_checkpoint_list(config, network);
359
360    let tip = match state_service
361        .ready()
362        .await
363        .unwrap()
364        .call(zs::Request::Tip)
365        .await
366        .unwrap()
367    {
368        zs::Response::Tip(tip) => tip,
369        _ => unreachable!("wrong response to Request::Tip"),
370    };
371    tracing::info!(
372        ?tip,
373        ?max_checkpoint_height,
374        "initializing block verifier router"
375    );
376
377    let block = SemanticBlockVerifier::new(network, state_service.clone(), transaction.clone());
378    let checkpoint = CheckpointVerifier::from_checkpoint_list(list, network, tip, state_service);
379    let router = BlockVerifierRouter {
380        checkpoint,
381        max_checkpoint_height,
382        block,
383    };
384
385    let router = Buffer::new(BoxService::new(router), VERIFIER_BUFFER_BOUND);
386
387    let task_handles = BackgroundTaskHandles {
388        state_checkpoint_verify_handle,
389    };
390
391    (router, transaction, task_handles, max_checkpoint_height)
392}
393
394/// Parses the checkpoint list for `network` and `config`.
395/// Returns the checkpoint list and maximum checkpoint height.
396pub fn init_checkpoint_list(config: Config, network: &Network) -> (Arc<CheckpointList>, Height) {
397    // TODO: Zebra parses the checkpoint list three times at startup.
398    //       Instead, cache the checkpoint list for each `network`.
399    let list = network.checkpoint_list();
400
401    let max_checkpoint_height = if config.checkpoint_sync {
402        list.max_height()
403    } else {
404        list.min_height_in_range(network.mandatory_checkpoint_height()..)
405            .expect("hardcoded checkpoint list extends past canopy activation")
406    };
407
408    (list, max_checkpoint_height)
409}
410
411/// The background task handles for `zebra-consensus` verifier initialization.
412#[derive(Debug)]
413pub struct BackgroundTaskHandles {
414    /// A handle to the state checkpoint verify task.
415    /// Finishes when all the checkpoints are verified, or when the state tip is reached.
416    pub state_checkpoint_verify_handle: JoinHandle<()>,
417}
418
419/// Calls [`init`] with a closed mempool setup channel for conciseness in tests.
420///
421/// See [`init`] for more details.
422#[cfg(any(test, feature = "proptest-impl"))]
423pub async fn init_test<S>(
424    config: Config,
425    network: &Network,
426    state_service: S,
427) -> (
428    Buffer<BoxService<Request, block::Hash, RouterError>, Request>,
429    Buffer<
430        BoxService<transaction::Request, transaction::Response, TransactionError>,
431        transaction::Request,
432    >,
433    BackgroundTaskHandles,
434    Height,
435)
436where
437    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
438    S::Future: Send + 'static,
439{
440    init(
441        config.clone(),
442        network,
443        state_service.clone(),
444        oneshot::channel::<
445            Buffer<BoxService<mempool::Request, mempool::Response, BoxError>, mempool::Request>,
446        >()
447        .1,
448    )
449    .await
450}