use core::fmt;
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures::{FutureExt, TryFutureExt};
use thiserror::Error;
use tokio::{sync::oneshot, task::JoinHandle};
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
use tracing::{instrument, Instrument, Span};
use zebra_chain::{
block::{self, Height},
parameters::{checkpoint::list::CheckpointList, Network},
};
use zebra_node_services::mempool;
use zebra_state as zs;
use crate::{
block::{Request, SemanticBlockVerifier, VerifyBlockError},
checkpoint::{CheckpointVerifier, VerifyCheckpointError},
error::TransactionError,
transaction, BoxError, Config,
};
pub mod service_trait;
#[cfg(test)]
mod tests;
const VERIFIER_BUFFER_BOUND: usize = 5;
struct BlockVerifierRouter<S, V>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
V: Service<transaction::Request, Response = transaction::Response, Error = BoxError>
+ Send
+ Clone
+ 'static,
V::Future: Send + 'static,
{
checkpoint: CheckpointVerifier<S>,
max_checkpoint_height: block::Height,
block: SemanticBlockVerifier<S, V>,
}
#[derive(Debug, Error)]
#[allow(missing_docs)]
pub enum RouterError {
Checkpoint { source: Box<VerifyCheckpointError> },
Block { source: Box<VerifyBlockError> },
}
impl fmt::Display for RouterError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&match self {
RouterError::Checkpoint { source } => {
format!("block could not be checkpointed due to: {source}")
}
RouterError::Block { source } => {
format!("block could not be full-verified due to: {source}")
}
})
}
}
impl From<VerifyCheckpointError> for RouterError {
fn from(err: VerifyCheckpointError) -> Self {
RouterError::Checkpoint {
source: Box::new(err),
}
}
}
impl From<VerifyBlockError> for RouterError {
fn from(err: VerifyBlockError) -> Self {
RouterError::Block {
source: Box::new(err),
}
}
}
impl RouterError {
pub fn is_duplicate_request(&self) -> bool {
match self {
RouterError::Checkpoint { source, .. } => source.is_duplicate_request(),
RouterError::Block { source, .. } => source.is_duplicate_request(),
}
}
pub fn misbehavior_score(&self) -> u32 {
match self {
RouterError::Checkpoint { source } => source.misbehavior_score(),
RouterError::Block { source } => source.misbehavior_score(),
}
}
}
impl<S, V> Service<Request> for BlockVerifierRouter<S, V>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
V: Service<transaction::Request, Response = transaction::Response, Error = BoxError>
+ Send
+ Clone
+ 'static,
V::Future: Send + 'static,
{
type Response = block::Hash;
type Error = RouterError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
use futures::ready;
ready!(self.checkpoint.poll_ready(cx))?;
ready!(self.block.poll_ready(cx))?;
Poll::Ready(Ok(()))
}
fn call(&mut self, request: Request) -> Self::Future {
let block = request.block();
match block.coinbase_height() {
Some(height) if height <= self.max_checkpoint_height && request.is_proposal() => {
async {
Err(VerifyBlockError::ValidateProposal(
"block proposals must be above checkpoint height".into(),
))?
}
.boxed()
}
Some(height) if height <= self.max_checkpoint_height => {
self.checkpoint.call(block).map_err(Into::into).boxed()
}
_ => self.block.call(request).map_err(Into::into).boxed(),
}
}
}
#[instrument(skip(state_service, mempool))]
pub async fn init<S, Mempool>(
config: Config,
network: &Network,
mut state_service: S,
mempool: oneshot::Receiver<Mempool>,
) -> (
Buffer<BoxService<Request, block::Hash, RouterError>, Request>,
Buffer<
BoxService<transaction::Request, transaction::Response, TransactionError>,
transaction::Request,
>,
BackgroundTaskHandles,
Height,
)
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
Mempool: Service<mempool::Request, Response = mempool::Response, Error = BoxError>
+ Send
+ Clone
+ 'static,
Mempool::Future: Send + 'static,
{
tokio::task::yield_now().await;
let checkpoint_state_service = state_service.clone();
let checkpoint_sync = config.checkpoint_sync;
let checkpoint_network = network.clone();
let state_checkpoint_verify_handle = tokio::task::spawn(
async move {
tracing::info!("starting state checkpoint validation");
let full_checkpoints = checkpoint_network.checkpoint_list();
let mut already_warned = false;
for (height, checkpoint_hash) in full_checkpoints.iter() {
let checkpoint_state_service = checkpoint_state_service.clone();
let request = zebra_state::Request::BestChainBlockHash(*height);
match checkpoint_state_service.oneshot(request).await {
Ok(zebra_state::Response::BlockHash(Some(state_hash))) => assert_eq!(
*checkpoint_hash, state_hash,
"invalid block in state: a previous Zebra instance followed an \
incorrect chain. Delete and re-sync your state to use the best chain"
),
Ok(zebra_state::Response::BlockHash(None)) => {
if checkpoint_sync {
tracing::info!(
"state is not fully synced yet, remaining checkpoints will be \
verified during syncing"
);
} else {
tracing::warn!(
"state is not fully synced yet, remaining checkpoints will be \
verified next time Zebra starts up. Zebra will be less secure \
until it is restarted. Use consensus.checkpoint_sync = true \
in zebrad.toml to make sure you are following a valid chain"
);
}
break;
}
Ok(response) => {
unreachable!("unexpected response type: {response:?} from state request")
}
Err(e) => {
if !already_warned {
tracing::warn!(
"unexpected error: {e:?} in state request while verifying previous \
state checkpoints. Is Zebra shutting down?"
);
already_warned = true;
}
}
}
}
tracing::info!("finished state checkpoint validation");
}
.instrument(Span::current()),
);
let transaction = transaction::Verifier::new(network, state_service.clone(), mempool);
let transaction = Buffer::new(BoxService::new(transaction), VERIFIER_BUFFER_BOUND);
let (list, max_checkpoint_height) = init_checkpoint_list(config, network);
let tip = match state_service
.ready()
.await
.unwrap()
.call(zs::Request::Tip)
.await
.unwrap()
{
zs::Response::Tip(tip) => tip,
_ => unreachable!("wrong response to Request::Tip"),
};
tracing::info!(
?tip,
?max_checkpoint_height,
"initializing block verifier router"
);
let block = SemanticBlockVerifier::new(network, state_service.clone(), transaction.clone());
let checkpoint = CheckpointVerifier::from_checkpoint_list(list, network, tip, state_service);
let router = BlockVerifierRouter {
checkpoint,
max_checkpoint_height,
block,
};
let router = Buffer::new(BoxService::new(router), VERIFIER_BUFFER_BOUND);
let task_handles = BackgroundTaskHandles {
state_checkpoint_verify_handle,
};
(router, transaction, task_handles, max_checkpoint_height)
}
pub fn init_checkpoint_list(config: Config, network: &Network) -> (Arc<CheckpointList>, Height) {
let list = network.checkpoint_list();
let max_checkpoint_height = if config.checkpoint_sync {
list.max_height()
} else {
list.min_height_in_range(network.mandatory_checkpoint_height()..)
.expect("hardcoded checkpoint list extends past canopy activation")
};
(list, max_checkpoint_height)
}
#[derive(Debug)]
pub struct BackgroundTaskHandles {
pub state_checkpoint_verify_handle: JoinHandle<()>,
}
#[cfg(any(test, feature = "proptest-impl"))]
pub async fn init_test<S>(
config: Config,
network: &Network,
state_service: S,
) -> (
Buffer<BoxService<Request, block::Hash, RouterError>, Request>,
Buffer<
BoxService<transaction::Request, transaction::Response, TransactionError>,
transaction::Request,
>,
BackgroundTaskHandles,
Height,
)
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
init(
config.clone(),
network,
state_service.clone(),
oneshot::channel::<
Buffer<BoxService<mempool::Request, mempool::Response, BoxError>, mempool::Request>,
>()
.1,
)
.await
}