ckb-sync 1.3.0

The ckb sync/relayer protocols implementation
Documentation
use crate::SyncShared;
use crate::relayer::compact_block_verifier::CompactBlockVerifier;
use crate::relayer::{ReconstructionResult, Relayer};
use crate::types::ActiveChain;
use crate::utils::async_send_message_to;
use crate::{Status, StatusCode, attempt};
use ckb_chain_spec::consensus::Consensus;
use ckb_logger::{self, debug_target};
use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_shared::block_status::BlockStatus;
use ckb_shared::types::HeaderIndex;
use ckb_systemtime::unix_time_as_millis;
use ckb_traits::{HeaderFields, HeaderFieldsProvider};
use ckb_types::{
    core::{EpochNumberWithFraction, HeaderView},
    packed::{self, Byte32, CompactBlock},
    prelude::*,
};
use ckb_util::shrink_to_fit;
use ckb_verification::{HeaderError, HeaderVerifier};
use ckb_verification_traits::Verifier;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

// Keeping in mind that short_ids are expected to occasionally collide.
// On receiving compact-block message,
// while the reconstructed the block has a different transactions_root,
// 1. if all the transactions are prefilled,
// the node should ban the peer but not mark the block invalid
// because of the block hash may be wrong.
// 2. otherwise, there may be short_id collision in transaction pool,
// the node retreat to request all the short_ids from the peer.
pub struct CompactBlockProcess<'a> {
    message: packed::CompactBlockReader<'a>,
    relayer: &'a Relayer,
    nc: Arc<dyn CKBProtocolContext + Sync>,
    peer: PeerIndex,
}

impl<'a> CompactBlockProcess<'a> {
    pub fn new(
        message: packed::CompactBlockReader<'a>,
        relayer: &'a Relayer,
        nc: Arc<dyn CKBProtocolContext + Sync>,
        peer: PeerIndex,
    ) -> Self {
        CompactBlockProcess {
            message,
            nc,
            relayer,
            peer,
        }
    }

    pub async fn execute(self) -> Status {
        let instant = Instant::now();
        let shared = self.relayer.shared();
        let active_chain = shared.active_chain();
        let compact_block = self.message.to_entity();
        let header = compact_block.header().into_view();
        let block_hash = header.hash();

        let status =
            non_contextual_check(&compact_block, &header, shared.consensus(), &active_chain);
        if !status.is_ok() {
            return status;
        }

        let status = contextual_check(&header, shared, &active_chain, &self.nc, self.peer).await;
        if !status.is_ok() {
            return status;
        }

        // The new arrived has greater difficulty than local best known chain
        attempt!(CompactBlockVerifier::verify(&compact_block));
        // Header has been verified ok, update state
        shared.insert_valid_header(self.peer, &header);

        // Request proposal
        let proposals: Vec<_> = compact_block.proposals().into_iter().collect();
        self.relayer.request_proposal_txs(
            &self.nc,
            self.peer,
            (header.number(), block_hash.clone()).into(),
            proposals,
        );

        // Reconstruct block
        let ret = self
            .relayer
            .reconstruct_block(&active_chain, &compact_block, vec![], &[], &[])
            .await;

        // Accept block
        // `relayer.accept_block` will make sure the validity of block before persisting
        // into database
        match ret {
            ReconstructionResult::Block(block) => {
                if let Some(metrics) = ckb_metrics::handle() {
                    metrics
                        .ckb_relay_cb_transaction_count
                        .inc_by(block.transactions().len() as u64);
                    metrics.ckb_relay_cb_reconstruct_ok.inc();
                }
                let mut pending_compact_blocks = shared.state().pending_compact_blocks().await;
                pending_compact_blocks.remove(&block_hash);
                // remove all pending request below this block epoch
                //
                // use epoch as the judgment condition because we accept
                // all block in current epoch as uncle block
                pending_compact_blocks.retain(|_, (v, _, _)| {
                    Into::<EpochNumberWithFraction>::into(v.header().as_reader().raw().epoch())
                        .number()
                        >= block.epoch().number()
                });
                shrink_to_fit!(pending_compact_blocks, 20);
                self.relayer
                    .accept_block(Arc::clone(&self.nc), self.peer, block, "CompactBlock");

                if let Some(metrics) = ckb_metrics::handle() {
                    metrics
                        .ckb_relay_cb_verify_duration
                        .observe(instant.elapsed().as_secs_f64());
                }
                Status::ok()
            }
            ReconstructionResult::Missing(transactions, uncles) => {
                let missing_transactions: Vec<u32> =
                    transactions.into_iter().map(|i| i as u32).collect();

                if let Some(metrics) = ckb_metrics::handle() {
                    metrics
                        .ckb_relay_cb_fresh_tx_cnt
                        .inc_by(missing_transactions.len() as u64);
                    metrics.ckb_relay_cb_reconstruct_fail.inc();
                }

                let missing_uncles: Vec<u32> = uncles.into_iter().map(|i| i as u32).collect();
                missing_or_collided_post_process(
                    compact_block,
                    block_hash.clone(),
                    shared,
                    self.nc,
                    missing_transactions,
                    missing_uncles,
                    self.peer,
                )
                .await;

                StatusCode::CompactBlockRequiresFreshTransactions.with_context(&block_hash)
            }
            ReconstructionResult::Collided => {
                let missing_transactions: Vec<u32> = compact_block
                    .short_id_indexes()
                    .into_iter()
                    .map(|i| i as u32)
                    .collect();
                let missing_uncles: Vec<u32> = vec![];
                missing_or_collided_post_process(
                    compact_block,
                    block_hash.clone(),
                    shared,
                    self.nc,
                    missing_transactions,
                    missing_uncles,
                    self.peer,
                )
                .await;
                StatusCode::CompactBlockMeetsShortIdsCollision.with_context(&block_hash)
            }
            ReconstructionResult::Error(status) => status,
        }
    }
}

struct CompactBlockMedianTimeView<'a> {
    fn_get_pending_header: Box<dyn Fn(packed::Byte32) -> Option<HeaderFields> + 'a>,
}

impl<'a> HeaderFieldsProvider for CompactBlockMedianTimeView<'a> {
    fn get_header_fields(&self, hash: &packed::Byte32) -> Option<HeaderFields> {
        // Note: don't query store because we already did that in `fn_get_pending_header -> get_header_view`.
        (self.fn_get_pending_header)(hash.to_owned())
    }
}

/// * check compact block's uncles and proposals length
/// * check compact block height
fn non_contextual_check(
    compact_block: &CompactBlock,
    header: &HeaderView,
    consensus: &Consensus,
    active_chain: &ActiveChain,
) -> Status {
    if compact_block.uncles().len() > consensus.max_uncles_num() {
        return StatusCode::ProtocolMessageIsMalformed.with_context(format!(
            "CompactBlock uncles count({}) > consensus max_uncles_num({})",
            compact_block.uncles().len(),
            consensus.max_uncles_num()
        ));
    }
    if (compact_block.proposals().len() as u64) > consensus.max_block_proposals_limit() {
        return StatusCode::ProtocolMessageIsMalformed.with_context(format!(
            "CompactBlock proposals count({}) > consensus max_block_proposals_limit({})",
            compact_block.proposals().len(),
            consensus.max_block_proposals_limit(),
        ));
    }

    // Only accept blocks with a height greater than tip - N
    // where N is the current epoch length
    let block_hash = header.hash();
    let tip = active_chain.tip_header();
    let epoch_length = active_chain.epoch_ext().length();
    let lowest_number = tip.number().saturating_sub(epoch_length);

    if lowest_number > header.number() {
        return StatusCode::CompactBlockIsStaled.with_context(block_hash);
    }

    Status::ok()
}

/// * check compact block if already stored in db
/// * check compact block extension validation
/// * check compact block's parent block is not stored in db
/// * check compact block is in pending
/// * check compact header verification
async fn contextual_check(
    compact_block_header: &HeaderView,
    shared: &Arc<SyncShared>,
    active_chain: &ActiveChain,
    nc: &Arc<dyn CKBProtocolContext + Sync>,
    peer: PeerIndex,
) -> Status {
    let block_hash = compact_block_header.hash();
    let tip = active_chain.tip_header();

    let status = active_chain.get_block_status(&block_hash);
    if status.contains(BlockStatus::BLOCK_STORED) {
        // update last common header and best known
        let parent = shared
            .get_header_index_view(&compact_block_header.data().raw().parent_hash(), true)
            .expect("parent block must exist");

        let header_index = HeaderIndex::new(
            compact_block_header.number(),
            block_hash.clone(),
            parent.total_difficulty() + compact_block_header.difficulty(),
        );
        let state = shared.state().peers();
        state.may_set_best_known_header(peer, header_index);

        return StatusCode::CompactBlockAlreadyStored.with_context(block_hash);
    } else if status.contains(BlockStatus::BLOCK_RECEIVED) {
        // block already in orphan pool
        return Status::ignored();
    } else if status.contains(BlockStatus::BLOCK_INVALID) {
        return StatusCode::BlockIsInvalid.with_context(block_hash);
    }

    let store_first = tip.number() + 1 >= compact_block_header.number();
    let parent = shared.get_header_index_view(
        &compact_block_header.data().raw().parent_hash(),
        store_first,
    );
    if parent.is_none() {
        debug_target!(
            crate::LOG_TARGET_RELAY,
            "UnknownParent: {}, send_getheaders_to_peer({})",
            block_hash,
            peer
        );
        active_chain.send_getheaders_to_peer(nc, peer, (&tip).into());
        return StatusCode::CompactBlockRequiresParent.with_context(format!(
            "{} parent: {}",
            block_hash,
            compact_block_header.data().raw().parent_hash(),
        ));
    }

    // compact block is in pending
    let pending_compact_blocks = shared.state().pending_compact_blocks().await;
    if pending_compact_blocks
        .get(&block_hash)
        .map(|(_, peers_map, _)| peers_map.contains_key(&peer))
        .unwrap_or(false)
    {
        return StatusCode::CompactBlockIsAlreadyPending.with_context(block_hash);
    }

    // compact header verification
    let fn_get_pending_header = {
        |block_hash| {
            pending_compact_blocks
                .get(&block_hash)
                .map(|(compact_block, _, _)| {
                    let header = compact_block.header().into_view();
                    HeaderFields {
                        hash: header.hash(),
                        number: header.number(),
                        epoch: header.epoch(),
                        timestamp: header.timestamp(),
                        parent_hash: header.parent_hash(),
                    }
                })
                .or_else(|| {
                    shared
                        .get_header_index_view(&block_hash, false)
                        .map(|header| HeaderFields {
                            hash: header.hash(),
                            number: header.number(),
                            epoch: header.epoch(),
                            timestamp: header.timestamp(),
                            parent_hash: header.parent_hash(),
                        })
                })
        }
    };
    let median_time_context = CompactBlockMedianTimeView {
        fn_get_pending_header: Box::new(fn_get_pending_header),
    };
    let header_verifier = HeaderVerifier::new(&median_time_context, shared.consensus());
    if let Err(err) = header_verifier.verify(compact_block_header) {
        if err
            .downcast_ref::<HeaderError>()
            .map(|e| e.is_too_new())
            .unwrap_or(false)
        {
            return Status::ignored();
        } else {
            shared
                .shared()
                .insert_block_status(block_hash.clone(), BlockStatus::BLOCK_INVALID);
            return StatusCode::CompactBlockHasInvalidHeader
                .with_context(format!("{block_hash} {err}"));
        }
    }

    Status::ok()
}

/// request missing txs and uncles from peer
async fn missing_or_collided_post_process(
    compact_block: CompactBlock,
    block_hash: Byte32,
    shared: &SyncShared,
    nc: Arc<dyn CKBProtocolContext + Sync>,
    missing_transactions: Vec<u32>,
    missing_uncles: Vec<u32>,
    peer: PeerIndex,
) {
    shared
        .state()
        .pending_compact_blocks()
        .await
        .entry(block_hash.clone())
        .or_insert_with(|| (compact_block, HashMap::default(), unix_time_as_millis()))
        .1
        .insert(peer, (missing_transactions.clone(), missing_uncles.clone()));

    let content = packed::GetBlockTransactions::new_builder()
        .block_hash(block_hash)
        .indexes(missing_transactions.as_slice())
        .uncle_indexes(missing_uncles.as_slice())
        .build();
    let message = packed::RelayMessage::new_builder().set(content).build();
    shared.shared().async_handle().spawn(async move {
        let sending = async_send_message_to(&nc, peer, &message).await;
        if !sending.is_ok() {
            ckb_logger::warn_target!(
                crate::LOG_TARGET_RELAY,
                "ignore the sending message error, error: {}",
                sending
            );
        }
    });
}